RFC: support pipe reads based on a condition.
authorRonald G. Minnich <rminnich@gmail.com>
Tue, 17 Sep 2013 06:17:26 +0000 (23:17 -0700)
committerBarret Rhoden <brho@cs.berkeley.edu>
Thu, 16 Jan 2014 02:01:23 +0000 (18:01 -0800)
This is a proposal for pipe reading conditioned on a function. It is often
necessary to read lock a pipe and read data from it until some condition is met.
It is essential that we keep the lock for our own uses until we are done, to ensure
integral copyout of possibly fragmented RPC and other requests. Note this
was not an issue with the Plan 9 'il' protocol, became an issue with IP V4,
and will not be an issue with IP V6. But we have to support v4, even though
we're supposed to be beyond all that now. Legacy sucks.

The apipe_read_cond is called with a function, f, and an arg. Each time the
pipe is non-empty, f is called with the pipe and arg as parameters. F returns
0 until some condition is met, in which case it returns > 0, or until failure,
in which case it returns < 0. f may read data from the pipe by calling
apipe_read_locked.

If this design meets with approval (ha!) then I can create a qio function that
uses these semantics. But, I expect the Akaros guys will do something
better :-)

Signed-off-by: Ronald G. Minnich <rminnich@gmail.com>
kern/include/apipe.h
kern/src/apipe.c

index bb63ca5..584e2eb 100644 (file)
@@ -69,6 +69,8 @@ struct atomic_pipe {
 void apipe_init(struct atomic_pipe *ap, void *buf, size_t buf_sz,
                 size_t elem_sz);
 int apipe_read(struct atomic_pipe *ap, void *buf, size_t nr_elem);
+int apipe_read_cond(struct atomic_pipe *ap,
+                   int(*f)(struct atomic_pipe *pipe, void *arg), void *arg);
 int apipe_write(struct atomic_pipe *ap, void *buf, size_t nr_elem);
 void *apipe_head(struct atomic_pipe *ap);
 
index d44220c..341c303 100644 (file)
@@ -58,20 +58,12 @@ void apipe_close_writer(struct atomic_pipe *ap)
        cv_unlock(&ap->ap_cv);
 }
 
-int apipe_read(struct atomic_pipe *ap, void *buf, size_t nr_elem)
+/* read a pipe that is already locked. */
+int apipe_read_locked(struct atomic_pipe *ap, void *buf, size_t nr_elem)
 {
        size_t rd_idx;
        int nr_copied = 0;
 
-       cv_lock(&ap->ap_cv);
-       while (__ring_empty(ap->ap_wr_off, ap->ap_rd_off)) {
-               if (!ap->ap_nr_writers) {
-                       cv_unlock(&ap->ap_cv);
-                       return 0;
-               }
-               cv_wait(&ap->ap_cv);
-               cpu_relax();
-       }
        for (int i = 0; i < nr_elem; i++) {
                /* power of 2 elements in the ring buffer, index is the lower n bits */
                rd_idx = ap->ap_rd_off & (ap->ap_ring_sz - 1);
@@ -82,6 +74,25 @@ int apipe_read(struct atomic_pipe *ap, void *buf, size_t nr_elem)
                if (__ring_empty(ap->ap_wr_off, ap->ap_rd_off))
                        break;
        }
+       return nr_copied;
+}
+
+
+int apipe_read(struct atomic_pipe *ap, void *buf, size_t nr_elem)
+{
+       size_t rd_idx;
+       int nr_copied = 0;
+
+       cv_lock(&ap->ap_cv);
+       while (__ring_empty(ap->ap_wr_off, ap->ap_rd_off)) {
+               if (!ap->ap_nr_writers) {
+                       cv_unlock(&ap->ap_cv);
+                       return 0;
+               }
+               cv_wait(&ap->ap_cv);
+               cpu_relax();
+       }
+       nr_copied = apipe_read_locked(ap, buf, nr_elem);
        /* since we're using just one CV, there could be readers and writers blocked
         * on it.  need to wake them all, to make sure we signal any blocked
         * writers. */
@@ -129,3 +140,45 @@ void *apipe_head(struct atomic_pipe *ap)
                return 0;
        return ap->ap_buf + (ap->ap_rd_off & (ap->ap_ring_sz - 1)) * ap->ap_elem_sz;
 }
+
+/* 
+ * Read data from the pipe until a condition is satisfied.
+ * f is the function that determines the condition. f saves its
+ * state in arg. When f returns non-zero, this function exits,
+ * and returns the value to its caller. Note that f can return -1
+ * to indicate an error. But returning zero will keep you trapped in
+ * this function. The intent here is to ensure one-reader-at-a-time
+ * operation.
+ */
+int apipe_read_cond(struct atomic_pipe *ap,
+                   int(*f)(struct atomic_pipe *pipe, void *arg), void *arg)
+{
+       size_t rd_idx;
+       int ret;
+
+       cv_lock(&ap->ap_cv);
+       while (1){
+               /* Each time there is a need to check the pipe, call
+                * f. f will maintain its state in arg. It is expected that f
+                * will dequeue elements from the pipe as they are available.
+                * N.B. this is being done for protocols like IPV4 that can
+                * fragment an RPC request. For IPV6, it is likely that this
+                * will end up looking like a blocking read. Thus was it ever
+                * with legacy code. F is supposed to call apipe_read_locked().
+                */
+               /* at any point, if nr_writers goes to zero, that's bad. */
+               if (!ap->ap_nr_writers) {
+                       cv_unlock(&ap->ap_cv);
+                       /* return -1 because they're going to have to clean up. */
+                       return -1;
+               }
+               ret = f(ap, arg);
+               if (ret)
+                       break;
+               cv_wait(&ap->ap_cv);
+               cpu_relax();
+       }
+       cv_unlock(&ap->ap_cv);
+       return ret;
+}
+