oprofile: preliminary design for queues between cores and device
authorRonald G. Minnich <rminnich@google.com>
Mon, 12 May 2014 15:14:54 +0000 (08:14 -0700)
committerRonald G. Minnich <rminnich@google.com>
Mon, 12 May 2014 15:14:54 +0000 (08:14 -0700)
Each core will put its results in a block. The per-cpu-buffer
contains a pointer to the block and an offset. Since we know
the block size, reserving space in the block and putting data
into it is easy.

The long term plan is that once the block is out of
room, the per-core code pushes it onto the per-core
fullqueue. The timer code, which runs on core 0, pulls
full blocks off the per-core full queue and pushes
empty blocks onto the per-core empty queue. In this way,
the cores run with very little overhead for saving data
(pointer/offset) and the messy work gets done on core "0".

Further, the per-core code can decide, for an even, that it's been
a long time since data has been pushed and push it even when the
block is not full. We may in the end want an interrupt at, say, once
per second to make sure stale data does not just hang around.

I'm open to changes and improvements in this idea; consider it
a straw man. We should try to work out what we want this week
however.

Signed-off-by: Ronald G. Minnich <rminnich@google.com>
kern/src/oprofile/cpu_buffer.c
kern/src/oprofile/cpu_buffer.h
scripts/RUN

index e719961..fff778e 100644 (file)
@@ -33,7 +33,7 @@ struct queue *opq;
 
 /* this is run from core 0 for all cpu buffers. */
 static void wq_sync_buffer(void);
-unsigned long oprofile_cpu_buffer_size = 32;
+unsigned long oprofile_cpu_buffer_size = 65536;
 unsigned long oprofile_backtrace_depth = 8;
 
 #define DEFAULT_TIMER_EXPIRE (HZ / 10)
@@ -109,10 +109,13 @@ void free_cpu_buffers(void)
 
 int alloc_cpu_buffers(void)
 {
+       /* should probably start using waserror() here. The fail stuff just gets
+        * ugly.
+        */
        int i;
        unsigned long buffer_size = oprofile_cpu_buffer_size;
        unsigned long byte_size = buffer_size * (sizeof(struct op_sample) +
-                                                                                        RB_EVENT_HDR_SIZE);
+                                                RB_EVENT_HDR_SIZE);
        /* this can get called lots of times. Things might have been freed.
         * So be careful.
         */
@@ -150,6 +153,8 @@ int alloc_cpu_buffers(void)
                        b->backtrace_aborted = 0;
                        b->sample_invalid_eip = 0;
                        b->cpu = i;
+                       b->fullqueue = qopen(1024, Qmsg, NULL, NULL);
+                       b->emptyqueue = qopen(1024, Qmsg, NULL, NULL);
                }
        }
 
@@ -192,22 +197,32 @@ struct op_sample *op_cpu_buffer_read_entry(struct op_entry *entry, int cpu)
        return NULL;
 }
 
-static struct block *op_cpu_buffer_write_reserve(struct op_entry *entry,
-                                                                                                int size)
+static struct block *op_cpu_buffer_write_reserve(struct oprofile_cpu_buffer *cpu_buf,
+       struct op_entry *entry, int size)
 {
        struct block *b;
-
-       b = allocb(sizeof(struct op_sample) +
-                          size * sizeof(entry->sample->data[0]));
-       if (!b) {
-               printk("%s: fail\n", __func__);
-               return NULL;
+       int totalsize = sizeof(struct op_sample) +
+               size * sizeof(entry->sample->data[0]);
+
+       b = cpu_buf->block;
+       /* we might have run out. */
+       if ((! b) || (b->lim - b->wp) < size) {
+               if (b)
+                       qbwrite(opq, b);
+               /* For now. Later, we will grab a block off the
+                * emptyblock queue.
+                */
+               cpu_buf->block = b = allocb(oprofile_cpu_buffer_size);
+               if (!b) {
+                       printk("%s: fail\n", __func__);
+                       return NULL;
+               }
        }
        entry->sample = (void *)b->wp;
        entry->size = size;
        entry->data = entry->sample->data;
 
-       b->wp += sizeof(struct op_sample) + size * sizeof(entry->sample->data[0]);
+       b->wp += totalsize;
        return b;
 
 }
@@ -260,7 +275,7 @@ op_add_code(struct oprofile_cpu_buffer *cpu_buf, unsigned long backtrace,
        else
                size = 0;
 
-       b = op_cpu_buffer_write_reserve(&entry, size);
+       b = op_cpu_buffer_write_reserve(cpu_buf, &entry, size);
 
        entry.sample->eip = ESCAPE_CODE;
        entry.sample->event = flags;
@@ -268,7 +283,6 @@ op_add_code(struct oprofile_cpu_buffer *cpu_buf, unsigned long backtrace,
        if (size)
                op_cpu_buffer_add_data(&entry, (unsigned long)proc);
 
-       qbwrite(opq, b);        /* note: out of our hands now. Don't free. */
        poperror();
        return 0;
 }
@@ -288,12 +302,11 @@ op_add_sample(struct oprofile_cpu_buffer *cpu_buf,
                return 1;
        }
 
-       b = op_cpu_buffer_write_reserve(&entry, 0);
+       b = op_cpu_buffer_write_reserve(cpu_buf, &entry, 0);
 
        sample = entry.sample;
        sample->eip = pc;
        sample->event = event;
-       qbwrite(opq, b);
        poperror();
        return 0;
 }
@@ -434,14 +447,13 @@ oprofile_write_reserve(struct op_entry *entry,
        if (op_add_code(cpu_buf, 0, is_kernel, current))
                goto fail;
 
-       b = op_cpu_buffer_write_reserve(entry, size + 2);
+       b = op_cpu_buffer_write_reserve(cpu_buf, entry, size + 2);
        sample = entry->sample;
        sample->eip = ESCAPE_CODE;
        sample->event = 0;      /* no flags */
 
        op_cpu_buffer_add_data(entry, code);
        op_cpu_buffer_add_data(entry, pc);
-       qbwrite(opq, b);
        poperror();
        return;
 fail:
index 56e5907..f7c725a 100644 (file)
@@ -52,6 +52,18 @@ struct oprofile_cpu_buffer {
        unsigned long backtrace_aborted;
        unsigned long sample_invalid_eip;
        int cpu;
+       struct block *block;
+       /* long term plan: when we fill the block,
+        * we write it to fullblock, and pull a
+        * freeblock from the emptyblock queue. 
+        * The thread that pulls fullbocks and
+        * allocates emptyblocks is timer-driven.
+        * Or, barret will make me use his queues,
+        * which is also fine; I just find the queue
+        * functions convenient because they interface to
+        * the dev code so easily.
+        */
+       struct queue *fullqueue, *emptyqueue;
 };
 
 struct op_sample *op_cpu_buffer_read_entry(struct op_entry *entry, int cpu);
index 1307880..e817aa3 100644 (file)
@@ -1,5 +1,5 @@
-#sudo qemu-system-x86_64 -s -cpu core2duo,+hypervisor,+vmx -smp 4 -m 2048 -nographic  \
-sudo qemu-system-x86_64 -s -cpu kvm64,+vmx -smp 4 -m 2048 -nographic  \
+#sudo qemu-system-x86_64 -s -cpu core2duo,+hypervisor,+vmx -smp 3 -m 2048 -nographic  \
+sudo qemu-system-x86_64 -s -cpu kvm64,+vmx -smp 8 -m 2048 -nographic  \
 --machine pc,accel=kvm \
 -net nic,model=rtl8139 \
 -net user,hostfwd=tcp::5555-:23 \