qio: Track the amount of bytes read
[akaros.git] / kern / src / net / devip.c
index 2bc6160..410f94d 100644 (file)
@@ -83,6 +83,8 @@ enum {
        Shiftproto = Logtype + Logconv,
 
        Nfs = 32,
+       BYPASS_QMAX = 64 * MiB,
+       IPROUTE_LEN = 2 * PGSIZE,
 };
 #define TYPE(x)        ( ((uint32_t)(x).path) & Masktype )
 #define CONV(x)        ( (((uint32_t)(x).path) >> Shiftconv) & Maskconv )
@@ -96,9 +98,11 @@ struct queue *qlog;
 
 extern void nullmediumlink(void);
 extern void pktmediumlink(void);
-extern char *eve;
+extern struct username eve;
 static long ndbwrite(struct Fs *, char *unused_char_p_t, uint32_t, int);
 static void closeconv(struct conv *);
+static void setup_proto_qio_bypass(struct conv *cv);
+static void undo_proto_qio_bypass(struct conv *cv);
 
 static struct conv *chan2conv(struct chan *chan)
 {
@@ -132,7 +136,7 @@ static int ip3gen(struct chan *c, int i, struct dir *dp)
 
        cv = chan2conv(c);
        if (cv->owner == NULL)
-               kstrdup(&cv->owner, eve);
+               kstrdup(&cv->owner, eve.name);
        mkqid(&q, QID(PROTO(c->qid), CONV(c->qid), i), 0, QTFILE);
 
        switch (i) {
@@ -153,7 +157,9 @@ static int ip3gen(struct chan *c, int i, struct dir *dp)
                        return founddevdir(c, q, "err", qlen(cv->eq),
                                                           cv->owner, perm, dp);
                case Qlisten:
-                       return founddevdir(c, q, "listen", 0, cv->owner, cv->perm, dp);
+                       perm = cv->perm;
+                       perm |= cv->incall ? DMREADABLE : 0;
+                       return founddevdir(c, q, "listen", 0, cv->owner, perm, dp);
                case Qlocal:
                        p = "local";
                        break;
@@ -432,6 +438,8 @@ static struct chan *ipopen(struct chan *c, int omode)
                        iprouteropen(f);
                        break;
                case Qiproute:
+                       c->synth_buf = kpages_zalloc(IPROUTE_LEN, MEM_WAIT);
+                       routeread(f, c->synth_buf, 0, IPROUTE_LEN);
                        break;
                case Qtopdir:
                case Qprotodir:
@@ -447,6 +455,7 @@ static struct chan *ipopen(struct chan *c, int omode)
                case Qsnoop:
                        if (omode & O_WRITE)
                                error(EPERM, ERROR_FIXME);
+                       /* might be racy.  note the lack of a proto lock, unlike Qdata */
                        p = f->p[PROTO(c->qid)];
                        cv = p->conv[CONV(c->qid)];
                        if (strcmp(ATTACHER(c), cv->owner) != 0 && !iseve())
@@ -464,7 +473,7 @@ static struct chan *ipopen(struct chan *c, int omode)
                        qunlock(&p->qlock);
                        poperror();
                        if (cv == NULL) {
-                               error(ENODEV, ERROR_FIXME);
+                               error(ENODEV, "Null conversation from Fsprotoclone");
                                break;
                        }
                        mkqid(&c->qid, QID(p->x, cv->x, Qctl), 0, QTFILE);
@@ -609,7 +618,7 @@ static int ipwstat(struct chan *c, uint8_t * dp, int n)
                error(EPERM, ERROR_FIXME);
        if (!emptystr(d->uid))
                kstrdup(&cv->owner, d->uid);
-       if (d->mode != ~0UL)
+       if (d->mode != -1)
                cv->perm = d->mode & 0777;
        poperror();
        kfree(d);
@@ -633,9 +642,11 @@ static char *ipchaninfo(struct chan *ch, char *ret, size_t ret_l)
                case Qdata:
                        proto = f->p[PROTO(ch->qid)];
                        conv = proto->conv[CONV(ch->qid)];
-                       snprintf(ret, ret_l, "Qdata, %s, proto %s, conv idx %d, rq len %d, wq len %d",
+                       snprintf(ret, ret_l,
+                                "Qdata, %s, proto %s, conv idx %d, rq len %d, wq len %d, total read %llu",
                                 SLIST_EMPTY(&conv->data_taps) ? "untapped" : "tapped",
-                                proto->name, conv->x, qlen(conv->rq), qlen(conv->wq));
+                                proto->name, conv->x, qlen(conv->rq), qlen(conv->wq),
+                                        q_bytes_read(conv->rq));
                        break;
                case Qarp:
                        ret = "Qarp";
@@ -646,9 +657,10 @@ static char *ipchaninfo(struct chan *ch, char *ret, size_t ret_l)
                case Qlisten:
                        proto = f->p[PROTO(ch->qid)];
                        conv = proto->conv[CONV(ch->qid)];
-                       snprintf(ret, ret_l, "Qlisten, %s proto %s, conv idx %d",
+                       snprintf(ret, ret_l,
+                                "Qlisten, %s proto %s, conv idx %d, has %sincalls",
                                 SLIST_EMPTY(&conv->listen_taps) ? "untapped" : "tapped",
-                                proto->name, conv->x);
+                                proto->name, conv->x, conv->incall ? "" : "no ");
                        break;
                case Qlog:
                        ret = "Qlog";
@@ -668,6 +680,7 @@ static char *ipchaninfo(struct chan *ch, char *ret, size_t ret_l)
 
 static void closeconv(struct conv *cv)
 {
+       ERRSTACK(1);
        struct conv *nc;
        struct Ipmulti *mp;
 
@@ -677,7 +690,10 @@ static void closeconv(struct conv *cv)
                qunlock(&cv->qlock);
                return;
        }
-
+       if (waserror()) {
+               qunlock(&cv->qlock);
+               nexterror();
+       }
        /* close all incoming calls since no listen will ever happen */
        for (nc = cv->incall; nc; nc = cv->incall) {
                cv->incall = nc->next;
@@ -693,9 +709,12 @@ static void closeconv(struct conv *cv)
 
        cv->r = NULL;
        cv->rgen = 0;
+       if (cv->state == Bypass)
+               undo_proto_qio_bypass(cv);
        cv->p->close(cv);
        cv->state = Idle;
        qunlock(&cv->qlock);
+       poperror();
 }
 
 static void ipclose(struct chan *c)
@@ -725,6 +744,10 @@ static void ipclose(struct chan *c)
                        if (c->flag & COPEN)
                                atomic_dec(&f->p[PROTO(c->qid)]->conv[CONV(c->qid)]->snoopers);
                        break;
+               case Qiproute:
+                       if (c->flag & COPEN)
+                               kpages_free(c->synth_buf, IPROUTE_LEN);
+                       break;
        }
        kfree(((struct IPaux *)c->aux)->owner);
        kfree(c->aux);
@@ -742,7 +765,6 @@ static long ipread(struct chan *ch, void *a, long n, int64_t off)
        long rv;
        struct Fs *f;
        uint32_t offset = off;
-       size_t sofar;
 
        f = ipfs[ch->dev];
 
@@ -759,7 +781,7 @@ static long ipread(struct chan *ch, void *a, long n, int64_t off)
                case Qndb:
                        return readstr(offset, a, n, f->ndb);
                case Qiproute:
-                       return routeread(f, a, offset, n);
+                       return readmem(offset, a, n, ch->synth_buf, IPROUTE_LEN);
                case Qiprouter:
                        return iprouterread(f, a, n);
                case Qipselftab:
@@ -802,7 +824,10 @@ static long ipread(struct chan *ch, void *a, long n, int64_t off)
                        buf = kzmalloc(Statelen, 0);
                        x = f->p[PROTO(ch->qid)];
                        c = x->conv[CONV(ch->qid)];
-                       sofar = (*x->state) (c, buf, Statelen - 2);
+                       if (c->state == Bypass)
+                               snprintf(buf, Statelen, "Bypassed\n");
+                       else
+                               (*x->state)(c, buf, Statelen - 2);
                        rv = readstr(offset, p, n, buf);
                        kfree(buf);
                        return rv;
@@ -872,7 +897,8 @@ static void setluniqueport(struct conv *c, int lport)
                        break;
                if (xp == c)
                        continue;
-               if ((xp->state == Connected || xp->state == Announced)
+               if ((xp->state == Connected || xp->state == Announced
+                                           || xp->state == Bypass)
                        && xp->lport == lport
                        && xp->rport == c->rport
                        && ipcmp(xp->raddr, c->raddr) == 0
@@ -1022,12 +1048,18 @@ void Fsstdconnect(struct conv *c, char *argv[], int argc)
                        break;
        }
 
+       /* TODO: why is an IPnoaddr (in v6 format, equivalent to v6Unspecified),
+        * a v4 format? */
        if ((memcmp(c->raddr, v4prefix, IPv4off) == 0 &&
                 memcmp(c->laddr, v4prefix, IPv4off) == 0)
                || ipcmp(c->raddr, IPnoaddr) == 0)
                c->ipversion = V4;
        else
                c->ipversion = V6;
+       /* Linux has taught people to use zeros for local interfaces.  TODO: We
+        * might need this for v6 in the future. */
+       if (!ipcmp(c->raddr, IPv4_zeroes))
+               ipmove(c->raddr, IPv4_loopback);
 }
 
 /*
@@ -1136,6 +1168,136 @@ static void bindctlmsg(struct Proto *x, struct conv *c, struct cmdbuf *cb)
                x->bind(c, cb->f, cb->nf);
 }
 
+/* Helper, called by protocols to use the bypass.
+ *
+ * This is a bit nasty due to the overall nastiness of #ip.  We need to lock
+ * before checking the state and hold the qlock throughout, because a concurrent
+ * closeconv() could tear down the bypass.  Specifically, it could free the
+ * bypass queues.  The root issue is that conversation lifetimes are not managed
+ * well.
+ *
+ * If we fail, it's our responsibility to consume (free) the block(s). */
+void bypass_or_drop(struct conv *cv, struct block *bp)
+{
+       qlock(&cv->qlock);
+       if (cv->state == Bypass)
+               qpass(cv->rq, bp);
+       else
+               freeblist(bp);
+       qunlock(&cv->qlock);
+}
+
+/* Push the block directly to the approprite ipoput function.
+ *
+ * It's the protocol's responsibility (and thus ours here) to make sure there is
+ * at least the right amount of the IP header in the block (ipoput{4,6} assumes
+ * it has the right amount, and the other protocols account for the IP header in
+ * their own header).
+ *
+ * For the TTL and TOS, we just use the default ones.  If we want, we could look
+ * into the actual block and see what the user wanted, though we're bypassing
+ * the protocol layer, not the IP layer. */
+static void proto_bypass_kick(void *arg, struct block *bp)
+{
+       struct conv *cv = (struct conv*)arg;
+       uint8_t vers_nibble;
+       struct Fs *f;
+
+       f = cv->p->f;
+
+       bp = pullupblock(bp, 1);
+       if (!bp)
+               error(EINVAL, "Proto bypass unable to pullup a byte!");
+       vers_nibble = *(uint8_t*)bp->rp & 0xf0;
+       switch (vers_nibble) {
+       case IP_VER4:
+               bp = pullupblock(bp, IPV4HDR_LEN);
+               if (!bp)
+                       error(EINVAL, "Proto bypass unable to pullup v4 header");
+               ipoput4(f, bp, FALSE, MAXTTL, DFLTTOS, NULL);
+               break;
+       case IP_VER6:
+               bp = pullupblock(bp, IPV6HDR_LEN);
+               if (!bp)
+                       error(EINVAL, "Proto bypass unable to pullup v6 header");
+               ipoput6(f, bp, FALSE, MAXTTL, DFLTTOS, NULL);
+               break;
+       default:
+               error(EINVAL, "Proto bypass block had unknown IP version 0x%x",
+                     vers_nibble);
+       }
+}
+
+/* Sets up cv for the protocol bypass.  We use different queues for two reasons:
+ * 1) To be protocol independent.  For instance, TCP and UDP could use very
+ * different QIO styles.
+ * 2) To set up our own kick/bypass method.  Note how udpcreate() and here uses
+ * qbypass() (just blast it out), while TCP uses qopen() with a kick.  TCP still
+ * follows queuing discipline.
+ *
+ * It's like we are our own protocol, the bypass protocol, when it comes to how
+ * we interact with qio.  The conv still is of the real protocol type (e.g.
+ * TCP).
+ *
+ * Note that we can't free the old queues.  The way #ip works, the queues are
+ * created when the conv is created, but the conv is never freed.  It's like a
+ * slab allocator that never frees objects, but just reinitializes them a
+ * little.
+ *
+ * For the queues, we're basically like UDP:
+ * - We take packets for rq and drop on overflow.
+ * - rq is also Qmsg, but we also have Qcoalesce, to ignore out zero-len blocks
+ * - We kick for our outbound (wq) messages.
+ *
+ * Note that Qmsg can drop parts of packets.  It's up to the user to read
+ * enough.  If they didn't read enough, the extra is dropped.  This is similar
+ * to SOCK_DGRAM and recvfrom().  Minus major changes, there's no nice way to
+ * get individual messages with read().  Userspace using the bypass will need to
+ * find out the MTU of the NIC the IP stack is attached to, and make sure to
+ * read in at least that amount each time. */
+static void setup_proto_qio_bypass(struct conv *cv)
+{
+       cv->rq_save = cv->rq;
+       cv->wq_save = cv->wq;
+       cv->rq = qopen(BYPASS_QMAX, Qmsg | Qcoalesce, 0, 0);
+       cv->wq = qbypass(proto_bypass_kick, cv);
+}
+
+static void undo_proto_qio_bypass(struct conv *cv)
+{
+       qfree(cv->rq);
+       qfree(cv->wq);
+       cv->rq = cv->rq_save;
+       cv->wq = cv->wq_save;
+       cv->rq_save = NULL;
+       cv->wq_save = NULL;
+}
+
+void Fsstdbypass(struct conv *cv, char *argv[], int argc)
+{
+       memset(cv->raddr, 0, sizeof(cv->raddr));
+       cv->rport = 0;
+       switch (argc) {
+       case 2:
+               setladdrport(cv, argv[1], 1);
+               break;
+       default:
+               error(EINVAL, "Bad args (was %d, need 2) to bypass", argc);
+       }
+}
+
+static void bypassctlmsg(struct Proto *x, struct conv *cv, struct cmdbuf *cb)
+{
+       if (!x->bypass)
+               error(EFAIL, "Protocol %s does not support bypass", x->name);
+       /* The protocol needs to set the port (usually by calling Fsstdbypass) and
+        * then do whatever it needs to make sure it can find the conv again during
+        * receive (usually by adding to a hash table). */
+       x->bypass(cv, cb->f, cb->nf);
+       setup_proto_qio_bypass(cv);
+       cv->state = Bypass;
+}
+
 static void shutdownctlmsg(struct conv *cv, struct cmdbuf *cb)
 {
        if (cb->nf < 2)
@@ -1177,6 +1339,22 @@ static void ttlctlmsg(struct conv *c, struct cmdbuf *cb)
                c->ttl = atoi(cb->f[1]);
 }
 
+/* Binds a conversation, as if the user wrote "bind *" into ctl. */
+static void autobind(struct conv *cv)
+{
+       ERRSTACK(1);
+       struct cmdbuf *cb;
+
+       cb = parsecmd("bind *", 7);
+       if (waserror()) {
+               kfree(cb);
+               nexterror();
+       }
+       bindctlmsg(cv->p, cv, cb);
+       poperror();
+       kfree(cb);
+}
+
 static long ipwrite(struct chan *ch, void *v, long n, int64_t off)
 {
        ERRSTACK(1);
@@ -1197,6 +1375,10 @@ static long ipwrite(struct chan *ch, void *v, long n, int64_t off)
                case Qdata:
                        x = f->p[PROTO(ch->qid)];
                        c = x->conv[CONV(ch->qid)];
+                       /* connection-less protocols (UDP) can write without manually
+                        * binding. */
+                       if (c->lport == 0)
+                               autobind(c);
                        if (ch->flag & O_NONBLOCK)
                                qwrite_nonblock(c->wq, a, n);
                        else
@@ -1230,6 +1412,8 @@ static long ipwrite(struct chan *ch, void *v, long n, int64_t off)
                                announcectlmsg(x, c, cb);
                        else if (strcmp(cb->f[0], "bind") == 0)
                                bindctlmsg(x, c, cb);
+                       else if (strcmp(cb->f[0], "bypass") == 0)
+                               bypassctlmsg(x, c, cb);
                        else if (strcmp(cb->f[0], "shutdown") == 0)
                                shutdownctlmsg(c, cb);
                        else if (strcmp(cb->f[0], "ttl") == 0)
@@ -1480,7 +1664,9 @@ retry:
                if (c == NULL) {
                        c = kzmalloc(sizeof(struct conv), 0);
                        if (c == NULL)
-                               error(ENOMEM, ERROR_FIXME);
+                               error(ENOMEM,
+                                     "conv kzmalloc(%d, 0) failed in Fsprotoclone",
+                                     sizeof(struct conv));
                        qlock_init(&c->qlock);
                        qlock_init(&c->listenq);
                        rendez_init(&c->cr);
@@ -1495,7 +1681,9 @@ retry:
                                c->ptcl = kzmalloc(p->ptclsize, 0);
                                if (c->ptcl == NULL) {
                                        kfree(c);
-                                       error(ENOMEM, ERROR_FIXME);
+                                       error(ENOMEM,
+                                             "ptcl kzmalloc(%d, 0) failed in Fsprotoclone",
+                                             p->ptclsize);
                                }
                        }
                        *pp = c;