qio: Add callbacks for unblocking queues
[akaros.git] / kern / src / ns / qio.c
index 4741f6c..14a7f5e 100644 (file)
@@ -56,6 +56,8 @@ struct queue {
        struct rendez rr;                       /* process waiting to read */
        qlock_t wlock;                          /* mutex for writing processes */
        struct rendez wr;                       /* process waiting to write */
+       qio_wake_cb_t wake_cb;          /* callbacks for qio wakeups */
+       void *wake_data;
 
        char err[ERRMAX];
 };
@@ -66,6 +68,13 @@ enum {
 
 unsigned int qiomaxatomic = Maxatomic;
 
+/* Helper: fires a wake callback, sending 'filter' */
+static void qwake_cb(struct queue *q, int filter)
+{
+       if (q->wake_cb)
+               q->wake_cb(q, q->wake_data, filter);
+}
+
 void ixsummary(void)
 {
        debugging ^= 1;
@@ -577,8 +586,11 @@ struct block *qget(struct queue *q)
 
        spin_unlock_irqsave(&q->lock);
 
-       if (dowakeup)
+       if (dowakeup) {
                rendez_wakeup(&q->wr);
+               /* We only send the writable event on wakeup, which is edge triggered */
+               qwake_cb(q, FDTAP_FILT_WRITABLE);
+       }
 
        return b;
 }
@@ -655,8 +667,10 @@ int qdiscard(struct queue *q, int len)
 
        spin_unlock_irqsave(&q->lock);
 
-       if (dowakeup)
+       if (dowakeup) {
                rendez_wakeup(&q->wr);
+               qwake_cb(q, FDTAP_FILT_WRITABLE);
+       }
 
        return sofar;
 }
@@ -723,8 +737,10 @@ int qconsume(struct queue *q, void *vp, int len)
 
        spin_unlock_irqsave(&q->lock);
 
-       if (dowakeup)
+       if (dowakeup) {
                rendez_wakeup(&q->wr);
+               qwake_cb(q, FDTAP_FILT_WRITABLE);
+       }
 
        if (tofree != NULL)
                freeblist(tofree);
@@ -778,8 +794,10 @@ int qpass(struct queue *q, struct block *b)
        }
        spin_unlock_irqsave(&q->lock);
 
-       if (dowakeup)
+       if (dowakeup) {
                rendez_wakeup(&q->rr);
+               qwake_cb(q, FDTAP_FILT_READABLE);
+       }
 
        return len;
 }
@@ -825,8 +843,10 @@ int qpassnolim(struct queue *q, struct block *b)
        }
        spin_unlock_irqsave(&q->lock);
 
-       if (dowakeup)
+       if (dowakeup) {
                rendez_wakeup(&q->rr);
+               qwake_cb(q, FDTAP_FILT_READABLE);
+       }
 
        return len;
 }
@@ -911,8 +931,10 @@ int qproduce(struct queue *q, void *vp, int len)
                q->state |= Qflow;
        spin_unlock_irqsave(&q->lock);
 
-       if (dowakeup)
+       if (dowakeup) {
                rendez_wakeup(&q->rr);
+               qwake_cb(q, FDTAP_FILT_READABLE);
+       }
 
        return len;
 }
@@ -1398,6 +1420,7 @@ static void qwakeup_iunlock(struct queue *q)
                if (q->kick)
                        q->kick(q->arg);
                rendez_wakeup(&q->wr);
+               qwake_cb(q, FDTAP_FILT_WRITABLE);
        }
 }
 
@@ -1620,8 +1643,10 @@ long qbwrite(struct queue *q, struct block *b)
                q->kick(q->arg);
 
        /* wakeup anyone consuming at the other end */
-       if (dowakeup)
+       if (dowakeup) {
                rendez_wakeup(&q->rr);
+               qwake_cb(q, FDTAP_FILT_READABLE);
+       }
 
        /*
         *  flow control, wait for queue to get below the limit
@@ -1680,6 +1705,7 @@ long qibwrite(struct queue *q, struct block *b)
                if (q->kick)
                        q->kick(q->arg);
                rendez_wakeup(&q->rr);
+               qwake_cb(q, FDTAP_FILT_READABLE);
        }
 
        return n;
@@ -1807,6 +1833,7 @@ void qclose(struct queue *q)
        /* wake up readers/writers */
        rendez_wakeup(&q->rr);
        rendez_wakeup(&q->wr);
+       qwake_cb(q, FDTAP_FILT_HANGUP);
 }
 
 /*
@@ -1827,6 +1854,7 @@ void qhangup(struct queue *q, char *msg)
        /* wake up readers/writers */
        rendez_wakeup(&q->rr);
        rendez_wakeup(&q->wr);
+       qwake_cb(q, FDTAP_FILT_HANGUP);
 }
 
 /*
@@ -1838,7 +1866,7 @@ int qisclosed(struct queue *q)
 }
 
 /*
- *  mark a queue as no longer hung up
+ *  mark a queue as no longer hung up.  resets the wake_cb.
  */
 void qreopen(struct queue *q)
 {
@@ -1847,6 +1875,8 @@ void qreopen(struct queue *q)
        q->state |= Qstarve;
        q->eof = 0;
        q->limit = q->inilim;
+       q->wake_cb = 0;
+       q->wake_data = 0;
        spin_unlock_irqsave(&q->lock);
 }
 
@@ -1925,8 +1955,9 @@ void qflush(struct queue *q)
        /* free queued blocks */
        freeblist(bfirst);
 
-       /* wake up readers/writers */
+       /* wake up writers */
        rendez_wakeup(&q->wr);
+       qwake_cb(q, FDTAP_FILT_WRITABLE);
 }
 
 int qfull(struct queue *q)
@@ -1945,3 +1976,20 @@ void qdump(struct queue *q)
                printk("q=%p bfirst=%p blast=%p len=%d dlen=%d limit=%d state=#%x\n",
                           q, q->bfirst, q->blast, q->len, q->dlen, q->limit, q->state);
 }
+
+/* On certain wakeup events, qio will call func(q, data, filter), where filter
+ * marks the type of wakeup event (flags from FDTAP).
+ *
+ * There's no sync protection.  If you change the CB while the qio is running,
+ * you might get a CB with the data or func from a previous set_wake_cb.  You
+ * should set this once per queue and forget it.
+ *
+ * You can remove the CB by passing in 0 for the func.  Alternatively, you can
+ * just make sure that the func(data) pair are valid until the queue is freed or
+ * reopened. */
+void qio_set_wake_cb(struct queue *q, qio_wake_cb_t func, void *data)
+{
+       q->wake_data = data;
+       wmb();  /* if we see func, we'll also see the data for it */
+       q->wake_cb = func;
+}