Fix extra decref of shared_page
[akaros.git] / kern / src / ceq.c
1 /* Copyright (c) 2015 Google Inc.
2  * Barret Rhoden <brho@cs.berkeley.edu>
3  * See LICENSE for details.
4  *
5  * Coalescing Event Queue: encapuslates the essence of epoll/kqueue in shared
6  * memory: a dense array of sticky status bits.
7  *
8  * Kernel side (producer)
9  *
10  * All of the printks are just us helping the user debug their CEQs. */
11
12 #include <ceq.h>
13 #include <process.h>
14 #include <stdio.h>
15 #include <umem.h>
16
17 static void error_addr(struct ceq *ceq, struct proc *p, void *addr)
18 {
19         printk("[kernel] Invalid ceq (%p) bad addr %p for proc %d\n", ceq,
20                addr, p->pid);
21 }
22
23 void send_ceq_msg(struct ceq *ceq, struct proc *p, struct event_msg *msg)
24 {
25         struct ceq_event *ceq_ev;
26         int32_t *ring_slot;
27         unsigned long my_slot;
28         int loops = 0;
29         #define NR_RING_TRIES 10
30
31         /* should have been checked by the kernel func that called us */
32         assert(is_user_rwaddr(ceq, sizeof(struct ceq)));
33         if (msg->ev_type >= ceq->nr_events) {
34                 printk("[kernel] CEQ %p too small.  Wanted %d, had %d\n", ceq,
35                        msg->ev_type, ceq->nr_events);
36                 return;
37         }
38         /* ACCESS_ONCE, prevent the compiler from rereading ceq->events later, and
39          * possibly getting a new, illegal version after our check */
40         ceq_ev = &(ACCESS_ONCE(ceq->events))[msg->ev_type];
41         if (!is_user_rwaddr(ceq_ev, sizeof(struct ceq_event))) {
42                 error_addr(ceq, p, ceq);
43                 return;
44         }
45         /* ideally, we'd like the blob to be posted after the coal, so that the
46          * 'reason' for the blob is present when the blob is.  but we can't
47          * guarantee that.  after we write the coal, the cons could consume that.
48          * then the next time it looks at us, it could just see the blob - so
49          * there's no good way to keep them together.  the user will just have to
50          * deal with it.  in that case, we might as well do it first, to utilize the
51          * atomic ops's memory barrier. */
52         ceq_ev->blob_data = (uint64_t)msg->ev_arg3;
53         switch (ceq->operation) {
54                 case (CEQ_OR):
55                         atomic_or(&ceq_ev->coalesce, msg->ev_arg2);
56                         break;
57                 case (CEQ_ADD):
58                         atomic_add(&ceq_ev->coalesce, msg->ev_arg2);
59                         break;
60                 default:
61                         printk("[kernel] CEQ %p invalid op%d\n", ceq, ceq->operation);
62                         return;
63         }
64         /* write before checking if we need to post (covered by the atomic) */
65         if (ceq_ev->idx_posted) {
66                 /* our entry was updated and posted was still set: we know the consumer
67                  * will still check it, so we can safely leave.  If we ever have exit
68                  * codes or something from send_*_msg, then we can tell the kernel to
69                  * not bother with INDIRS/IPIs/etc.  This is unnecessary now since
70                  * INDIRs are throttled */
71                 return;
72         }
73         /* at this point, we need to make sure the cons looks at our entry.  it may
74          * have already done so while we were mucking around, but 'poking' them to
75          * look again can't hurt */
76         ceq_ev->idx_posted = TRUE;
77         /* idx_posted write happens before the writes posting it.  the following
78          * atomic provides the cpu mb() */
79         cmb();
80         /* I considered checking the buffer for full-ness or the ceq overflow here.
81          * Those would be reads, which would require a wrmb() right above for every
82          * ring post, all for something we check for later anyways and for something
83          * that should be rare.  In return, when we are overflowed, which should be
84          * rare if the user sizes their ring buffer appropriately, we go through a
85          * little more hassle below. */
86         /* I tried doing this with fetch_and_add to avoid the while loop and picking
87          * a number of times to try.  The trick is that you need to back out, and
88          * could have multiple producers working on the same slot.  Although the
89          * overflow makes it okay for the producers idxes to be clobbered, it's not
90          * okay to have two producers on the same slot, since there'd only be one
91          * consumer.  Theoretically, you could have a producer delayed a long time
92          * that just clobbers an index at some point in the future, or leaves an
93          * index in the non-init state (-1).  It's a mess. */
94         do {
95                 cmb();  /* reread the indices */
96                 my_slot = atomic_read(&ceq->prod_idx);
97                 if (__ring_full(ceq->ring_sz, my_slot,
98                                 atomic_read(&ceq->cons_pub_idx))) {
99                         ceq->ring_overflowed = TRUE;
100                         return;
101                 }
102                 if (loops++ == NR_RING_TRIES) {
103                         ceq->ring_overflowed = TRUE;
104                         return;
105                 }
106         } while (!atomic_cas(&ceq->prod_idx, my_slot, my_slot + 1));
107         /* ring_slot is a user pointer, calculated by ring, my_slot, and sz */
108         ring_slot = &(ACCESS_ONCE(ceq->ring))[my_slot & (ceq->ring_sz - 1)];
109         if (!is_user_rwaddr(ring_slot, sizeof(int32_t))) {
110                 /* This is a serious user error.  We're just bailing out, and any
111                  * consumers might be spinning waiting on us to produce.  Probably not
112                  * though, since the ring slot is bad memory. */
113                 error_addr(ceq, p, ring_slot);
114                 return;
115         }
116         /* At this point, we have a valid slot */
117         *ring_slot = msg->ev_type;
118 }