Fixed recv function in the driver to process more than one packet per interrupt.
authorDavid Zhu <yuzhu@cs.berkeley.edu>
Tue, 12 Apr 2011 21:38:42 +0000 (14:38 -0700)
committerDavid Zhu <yuzhu@cs.berkeley.edu>
Mon, 2 Apr 2012 22:03:28 +0000 (15:03 -0700)
Also enabled processes to be blocked on a socket by using recvfrom.

UDP sending and receiving of regularly sized packets should work.
There are a bunch of potential issues. Mostly, performance related.
Locking and unlocking using spinlocks for each packet is going to hurt when performance
becomes an issue..

kern/arch/i686/e1000.c
kern/arch/i686/e1000.h
kern/include/kthread.h
kern/include/socket.h
kern/src/kthread.c
kern/src/net/ip.c
kern/src/net/pbuf.c
kern/src/net/udp.c
kern/src/socket.c
tests/udp_test.c

index 000984c..526ef7a 100644 (file)
@@ -50,8 +50,10 @@ uint32_t e1000_addr_size = 0;
 unsigned char device_mac[6];
 
 // Vars relating to the receive descriptor ring
+// pointer to receive descriptors
 struct e1000_rx_desc *rx_des_kva;
 unsigned long rx_des_pa;
+// current rx index
 uint32_t e1000_rx_index = 0;
 
 
@@ -79,6 +81,9 @@ spinlock_t packet_buffers_lock;
 uint16_t device_id;
 extern int (*send_frame)(const char *CT(len) data, size_t len);
 
+// compat defines that make transitioning easier
+#define E1000_RX_DESC(x) rx_des_kva[x]
+
 void e1000_dump_rx() {
 
        for (int i = 0; i < 10; i++) {
@@ -395,8 +400,8 @@ void e1000_setup_descriptors() {
        // Must be 16 byte aligned
 
        // How many pages do we need?
-        uint32_t num_rx_pages = ROUNDUP(NUM_RX_DESCRIPTORS * sizeof(struct e1000_rx_desc), PGSIZE) / PGSIZE;
-        uint32_t num_tx_pages = ROUNDUP(NUM_TX_DESCRIPTORS * sizeof(struct e1000_tx_desc), PGSIZE) / PGSIZE;
+  uint32_t num_rx_pages = ROUNDUP(NUM_RX_DESCRIPTORS * sizeof(struct e1000_rx_desc), PGSIZE) / PGSIZE;
+  uint32_t num_tx_pages = ROUNDUP(NUM_TX_DESCRIPTORS * sizeof(struct e1000_tx_desc), PGSIZE) / PGSIZE;
        
        // Get the pages
        rx_des_kva = get_cont_pages(LOG2_UP(num_rx_pages), 0);
@@ -612,6 +617,7 @@ void e1000_reset() {
 }
 
 void e1000_irq_enable() {
+       printk("e1000 enabled\n");
        e1000_wr32(E1000_IMS, IMS_ENABLE_MASK);
        E1000_WRITE_FLUSH();
 }
@@ -641,12 +647,15 @@ void e1000_setup_interrupts() {
 #ifdef __CONFIG_ENABLE_MPTABLES__
        /* TODO: this should be for any IOAPIC EOI, not just MPTABLES */
        ioapic_route_irq(e1000_irq, E1000_IRQ_CPU);     
+       printk("ioapic rout\n");
+
 #else 
        // This will route the interrupts automatically to CORE 0
        // Call send_kernel_message if you want to route them somewhere else
        pic_unmask_irq(e1000_irq);
        unmask_lapic_lvt(LAPIC_LVT_LINT0);
        enable_irq();
+       printk("picroute\n");
 #endif
 
        return;
@@ -670,14 +679,14 @@ void e1000_interrupt_handler(trapframe_t *tf, void* data) {
        e1000_wr32(E1000_IMC, ~0);
        E1000_WRITE_FLUSH();
 
-       printk("Interrupt status: %x\n", icr);
+       //printk("Interrupt status: %x\n", icr);
 
        if ((icr & E1000_ICR_INT_ASSERTED) && (icr & E1000_ICR_RXT0)){
                e1000_interrupt_debug("---->Packet Received\n");
 #ifdef __CONFIG_SOCKET__
 //#if 0
-               struct pbuf *pb = e1000_recv_pbuf();
-               schedule_pb(pb);
+               e1000_clean_rx_irq();
+               // e1000_recv_pbuf(); // really it is now performing the function of rx_clean
 #else
                e1000_handle_rx_packet();
 #endif
@@ -690,13 +699,14 @@ void process_pbuf(struct trapframe *tf, uint32_t srcid, long a0, long a1, long a
                warn("pbuf came from a different core\n");
        /* assume it is an ip packet */
        struct pbuf* pb = (struct pbuf*) a0;
-       printk("processing pbuf \n");
+       //printk("processing pbuf \n");
        /*TODO: check checksum and drop */
        /*check packet type*/
        struct ethernet_hdr *ethhdr = (struct ethernet_hdr *) pb->payload;
-       printk("start of eth %p \n", pb->payload);
-       print_pbuf(pb);
+       //printk("start of eth %p \n", pb->payload);
+       //print_pbuf(pb);
        if (memcmp(ethhdr->dst_mac, device_mac, 6)){
+               e1000_debug("mac address do not match, pbuf freed \n");
                pbuf_free(pb);
        }
        switch(htons(ethhdr->eth_type)){
@@ -715,7 +725,6 @@ void process_pbuf(struct trapframe *tf, uint32_t srcid, long a0, long a1, long a
 }
 
 static void schedule_pb(struct pbuf* pb) {
-       printk("scheduled pb \n");
        /* routine kernel message is kind of heavy weight, because it records src/dst etc */
        /* TODO: consider a core-local chain of pbufs */
        // using core 3 for network stuff..XXX
@@ -728,6 +737,7 @@ void e1000_handle_rx_packet() {
        
        uint16_t packet_size;
        uint32_t status;
+       // find rx descriptor head
        uint32_t head = e1000_rr32(E1000_RDH);
 
        //printk("Current head is: %x\n", e1000_rr32(E1000_RDH));
@@ -879,17 +889,47 @@ void e1000_handle_rx_packet() {
        
        return;
 }
-#if 0
-static int e1000_clean_rx_irq() {
+
+static void e1000_clean_rx_irq() {
+       // e1000_rx_index is the last one that we have processed
        uint32_t i= e1000_rx_index;
-       // recv head
+       // E1000 RDH is the last descriptor written by the hardware
        uint32_t head = e1000_rr32(E1000_RDH);
-       struct e1000_rx_desc *rx_desc, *next_rxd;
+       uint32_t length = 0;
+       struct e1000_rx_desc *rx_desc =  &(E1000_RX_DESC(i));
+
+       // what happens when i go around the ring? 
+       while (rx_desc->status & E1000_RXD_STAT_DD){
+               struct pbuf* pb;
+               uint8_t status;
+               rx_desc = &rx_des_kva[i];
+               // buffer_info = &rx_des_kva[i];
+               status = rx_desc->status;
+               pb = pbuf_alloc(PBUF_RAW, 0 , PBUF_MTU);
+#if ETH_PAD_SIZE
+               pbuf_header(pb, -ETH_PAD_SIZE); /* drop the padding word */
+#endif
+               // fragment size
+               length = le16_to_cpu(rx_desc->length);
 
-       rx_desc = rx_des_kva[i];
-       buffer_info = rx_des_kva[i]
+               length -= 4;
+
+               memcpy(pb->payload, KADDR(E1000_RX_DESC(i).buffer_addr), length);
+               // skb_put(skb, length);
+               pb->len = length;
+               pb->tot_len = length;
+               schedule_pb(pb);
+               // do all the error handling 
+next_desc:
+               // this replaces e1000_set_rx_descriptor
+               rx_desc->status = 0;
+               if (++i == NUM_RX_DESCRIPTORS) i = 0;
+               rx_desc = &(E1000_RX_DESC(i)); 
+       }
+       //setting e1000_RDH?
+               printk ("cleaned index %d to %d \n", e1000_rx_index, i-1);
+               e1000_rx_index = i;
 }
-#endif
 
 struct pbuf* e1000_recv_pbuf(void) {
        uint16_t packet_size;
@@ -899,14 +939,12 @@ struct pbuf* e1000_recv_pbuf(void) {
 
        printk("Current head is: %x\n", e1000_rr32(E1000_RDH));
        printk("Current tail is: %x\n", e1000_rr32(E1000_RDT));
-       
+       // e1000_rx_index = cleaned
        // If the HEAD is where we last processed, no new packets.
        if (head == e1000_rx_index) {
                e1000_frame_debug("-->Nothing to process. Returning.");
                return NULL;
        }
-
-       
        // Set our current descriptor to where we last left off.
        uint32_t rx_des_loop_cur = e1000_rx_index;
        uint32_t frame_size = 0;
@@ -1011,9 +1049,170 @@ struct pbuf* e1000_recv_pbuf(void) {
 
        pb->len = copied;
        pb->tot_len = copied;
+       schedule_pb(pb);
        return pb;
 }
 
+#if 0
+
+int e1000_clean_rx(){
+       struct net_device *netdev = adapter->netdev;
+       struct pci_dev *pdev = adapter->pdev;
+       struct e1000_rx_desc *rx_desc, *next_rxd;
+       struct e1000_buffer *buffer_info, *next_buffer;
+       unsigned long flags;
+       uint32_t length;
+       uint8_t last_byte;
+       unsigned int i;
+       int cleaned_count = 0;
+       boolean_t cleaned = FALSE;
+       unsigned int total_rx_bytes=0, total_rx_packets=0;
+
+       i = rx_ring->next_to_clean;
+       // rx_desc is the same as rx_des_kva[rx_des_loop_cur]
+       rx_desc = E1000_RX_DESC(*rx_ring, i);
+       buffer_info = &rx_ring->buffer_info[i];
+
+       while (rx_desc->status & E1000_RXD_STAT_DD) {
+               struct sk_buff *skb;
+               u8 status;
+
+#ifdef CONFIG_E1000_NAPI
+               if (*work_done >= work_to_do)
+                       break;
+               (*work_done)++;
+#endif
+               status = rx_desc->status;
+               skb = buffer_info->skb;
+               buffer_info->skb = NULL;
+
+               prefetch(skb->data - NET_IP_ALIGN);
+
+               if (++i == rx_ring->count) i = 0;
+               next_rxd = E1000_RX_DESC(*rx_ring, i);
+               prefetch(next_rxd);
+
+               next_buffer = &rx_ring->buffer_info[i];
+
+               cleaned = TRUE;
+               cleaned_count++;
+               pci_unmap_single(pdev,
+                                buffer_info->dma,
+                                buffer_info->length,
+                                PCI_DMA_FROMDEVICE);
+
+               length = le16_to_cpu(rx_desc->length);
+
+               if (unlikely(!(status & E1000_RXD_STAT_EOP))) {
+                       /* All receives must fit into a single buffer */
+                       E1000_DBG("%s: Receive packet consumed multiple"
+                                 " buffers\n", netdev->name);
+                       /* recycle */
+                       buffer_info->skb = skb;
+                       goto next_desc;
+               }
+
+               if (unlikely(rx_desc->errors & E1000_RXD_ERR_FRAME_ERR_MASK)) {
+                       last_byte = *(skb->data + length - 1);
+                       if (TBI_ACCEPT(&adapter->hw, status,
+                                     rx_desc->errors, length, last_byte)) {
+                               spin_lock_irqsave(&adapter->stats_lock, flags);
+                               e1000_tbi_adjust_stats(&adapter->hw,
+                                                      &adapter->stats,
+                                                      length, skb->data);
+                               spin_unlock_irqrestore(&adapter->stats_lock,
+                                                      flags);
+                               length--;
+                       } else {
+                               /* recycle */
+                               buffer_info->skb = skb;
+                               goto next_desc;
+                       }
+               }
+
+               /* adjust length to remove Ethernet CRC, this must be
+                * done after the TBI_ACCEPT workaround above */
+               length -= 4;
+
+               /* probably a little skewed due to removing CRC */
+               total_rx_bytes += length;
+               total_rx_packets++;
+
+               /* code added for copybreak, this should improve
+                * performance for small packets with large amounts
+                * of reassembly being done in the stack */
+               if (length < copybreak) {
+                       struct sk_buff *new_skb =
+                           netdev_alloc_skb(netdev, length + NET_IP_ALIGN);
+                       if (new_skb) {
+                               skb_reserve(new_skb, NET_IP_ALIGN);
+                               memcpy(new_skb->data - NET_IP_ALIGN,
+                                      skb->data - NET_IP_ALIGN,
+                                      length + NET_IP_ALIGN);
+                               /* save the skb in buffer_info as good */
+                               buffer_info->skb = skb;
+                               skb = new_skb;
+                       }
+                       /* else just continue with the old one */
+               }
+               /* end copybreak code */
+               skb_put(skb, length);
+
+               /* Receive Checksum Offload */
+               e1000_rx_checksum(adapter,
+                                 (uint32_t)(status) |
+                                 ((uint32_t)(rx_desc->errors) << 24),
+                                 le16_to_cpu(rx_desc->csum), skb);
+
+               skb->protocol = eth_type_trans(skb, netdev);
+#ifdef CONFIG_E1000_NAPI
+               if (unlikely(adapter->vlgrp &&
+                           (status & E1000_RXD_STAT_VP))) {
+                       vlan_hwaccel_receive_skb(skb, adapter->vlgrp,
+                                                le16_to_cpu(rx_desc->special) &
+                                                E1000_RXD_SPC_VLAN_MASK);
+               } else {
+                       netif_receive_skb(skb);
+               }
+#else /* CONFIG_E1000_NAPI */
+               if (unlikely(adapter->vlgrp &&
+                           (status & E1000_RXD_STAT_VP))) {
+                       vlan_hwaccel_rx(skb, adapter->vlgrp,
+                                       le16_to_cpu(rx_desc->special) &
+                                       E1000_RXD_SPC_VLAN_MASK);
+               } else {
+                       netif_rx(skb);
+               }
+#endif /* CONFIG_E1000_NAPI */
+               netdev->last_rx = jiffies;
+
+next_desc:
+               rx_desc->status = 0;
+
+               /* return some buffers to hardware, one at a time is too slow */
+               if (unlikely(cleaned_count >= E1000_RX_BUFFER_WRITE)) {
+                       adapter->alloc_rx_buf(adapter, rx_ring, cleaned_count);
+                       cleaned_count = 0;
+               }
+
+               /* use prefetched values */
+               rx_desc = next_rxd;
+               buffer_info = next_buffer;
+       }
+       rx_ring->next_to_clean = i;
+
+       cleaned_count = E1000_DESC_UNUSED(rx_ring);
+       if (cleaned_count)
+               adapter->alloc_rx_buf(adapter, rx_ring, cleaned_count);
+
+       adapter->total_rx_packets += total_rx_packets;
+       adapter->total_rx_bytes += total_rx_bytes;
+       return cleaned;
+}
+}
+
+#endif
+
 int e1000_send_pbuf(struct pbuf *p) {
        int len = p->tot_len;
        // print_pbuf(p);
index 832a72b..6771651 100644 (file)
@@ -6,10 +6,15 @@
 #include <pmap.h>
 #include <arch/nic_common.h>
 #include <net/pbuf.h>
-
+#if 1
 #define e1000_debug(...)               printk(__VA_ARGS__)  
 #define e1000_interrupt_debug(...)     printk(__VA_ARGS__)  
 #define e1000_frame_debug(...)         printk(__VA_ARGS__)  
+#else
+#define e1000_debug(...)
+#define e1000_interrupt_debug(...)
+#define e1000_frame_debug(...)
+#endif
 
 #define E1000_IRQ_CPU          0
 
 // This should be in line with the setting of BSIZE in RCTL
 #define E1000_RX_MAX_BUFFER_SIZE 2048
 #define E1000_TX_MAX_BUFFER_SIZE 2048
+#if 0
+struct e1000_tx_ring {
+       /* pointer to the descriptor ring memory */
+       void *desc;
+       /* physical address of the descriptor ring */
+       dma_addr_t dma;
+       /* length of descriptor ring in bytes */
+       unsigned int size;
+       /* number of descriptors in the ring */
+       unsigned int count;
+       /* next descriptor to associate a buffer with */
+       unsigned int next_to_use;
+       /* next descriptor to check for DD status bit */
+       unsigned int next_to_clean;
+       /* array of buffer information structs */
+       struct e1000_buffer *buffer_info;
+
+       spinlock_t tx_lock;
+       uint16_t tdh;
+       uint16_t tdt;
+       boolean_t last_tx_tso;
+};
+
+struct e1000_rx_ring {
+       /* pointer to the descriptor ring memory */
+       void *desc;
+       /* physical address of the descriptor ring */
+       dma_addr_t dma;
+       /* length of descriptor ring in bytes */
+       unsigned int size;
+       /* number of descriptors in the ring */
+       unsigned int count;
+       /* next descriptor to associate a buffer with */
+       unsigned int next_to_use;
+       /* next descriptor to check for DD status bit */
+       unsigned int next_to_clean;
+       /* array of buffer information structs */
+       struct e1000_buffer *buffer_info;
+       /* arrays of page information for packet split */
+       struct e1000_ps_page *ps_page;
+       struct e1000_ps_page_dma *ps_page_dma;
+
+       /* cpu for rx queue */
+       int cpu;
+
+       uint16_t rdh;
+       uint16_t rdt;
+};
+struct e1000_adaptor{
+       struct e1000_tx_ring tx_ring;
+       struct e1000_rx_ring rx_ring;
+
+
+}
+#endif
 
 /* driver private functions */
 static uint32_t e1000_rr32(uint32_t offset);
@@ -55,6 +115,7 @@ void e1000_set_rx_descriptor(uint32_t des_num, uint8_t reset_buffer);
 void e1000_set_tx_descriptor(uint32_t des_num);
 int  e1000_send_frame(const char* data, size_t len);
 int e1000_send_pbuf(struct pbuf *p);
+static void e1000_clean_rx_irq();
 /* returns a chain of pbuf from the driver */
 struct pbuf* e1000_recv_pbuf();
 void process_pbuf(struct trapframe *tf, uint32_t srcid, long a0, long a1, long a2);
index 60108c0..39fbfaf 100644 (file)
@@ -57,7 +57,7 @@ static inline bool __down_sem(struct semaphore *sem, struct kthread *kthread)
 {
        bool retval = FALSE;
        spin_lock(&sem->lock);
-       if (sem->nr_signals-- <= 0) {
+       if (sem->nr_signals-- <= 0 && kthread != NULL) {
                /* Need to sleep */
                retval = TRUE;
                TAILQ_INSERT_TAIL(&sem->waiters, kthread, link);
@@ -75,6 +75,7 @@ static inline struct kthread *__up_sem(struct semaphore *sem, bool exactly_one)
        if (sem->nr_signals++ < 0) {
                /* could do something with 'priority' here */
                kthread = TAILQ_FIRST(&sem->waiters);
+               if (kthread == NULL) warn ("kthread is null\n");
                TAILQ_REMOVE(&sem->waiters, kthread, link);
                if (exactly_one)
                        assert(TAILQ_EMPTY(&sem->waiters));
index 88f8c1a..83821f7 100644 (file)
@@ -5,6 +5,7 @@
 #include <sys/queue.h>
 #include <atomic.h>
 #include <net/pbuf.h>
+#include <kthread.h>
 // Just a couple of AF types that we might support
 #define AF_UNSPEC      0
 #define AF_UNIX                1       /* Unix domain sockets          */
@@ -58,6 +59,7 @@ struct socket{
        void    *so_pcb;        /* protocol control block */
        struct pbuf_head recv_buff;
        struct pbuf_head send_buff;
+       struct semaphore sem;   /* semaphone to for a process to sleep on */
        
        //struct  vnet *so_vnet;      /* network stack instance */
        //struct  protosw *so_proto;  /* (a) protocol handle */
index 0427704..2ea8cea 100644 (file)
@@ -178,6 +178,7 @@ void restart_kthread(struct kthread *kthread)
        kthread->stacktop = current_stacktop;
        /* Only change current if we need to (the kthread was in process context) */
        if (kthread->proc) {
+       /* Only change current if we need to (the kthread was in process context) */
                /* Load our page tables before potentially decreffing cur_proc */
                lcr3(kthread->proc->env_cr3);
                /* Might have to clear out an existing current.  If they need to be set
@@ -187,6 +188,7 @@ void restart_kthread(struct kthread *kthread)
                /* We also transfer our counted ref from kthread->proc to cur_proc */
                pcpui->cur_proc = kthread->proc;
        }
+
        /* Tell the core which syscall we are running (if any) */
        assert(!pcpui->cur_sysc);       /* catch bugs, prev user should clear */
        pcpui->cur_sysc = kthread->sysc;
index b20fc99..86aed6e 100644 (file)
@@ -99,8 +99,8 @@ int ip_output(struct pbuf *p, struct in_addr *src, struct in_addr *dest, uint8_t
 int ip_input(struct pbuf *p) {
        uint32_t iphdr_hlen, iphdr_len;
        struct ip_hdr *iphdr = (struct ip_hdr *)p->payload;
-       printk("start of ip %p \n", p->payload);
-       print_pbuf(p);
+       //printk("start of ip %p \n", p->payload);
+       //print_pbuf(p);
        /* use that info to build arp table */
   if (iphdr->version != 4) {
                warn("ip version not 4!\n");
@@ -109,7 +109,7 @@ int ip_input(struct pbuf *p) {
        }
        iphdr_hlen = iphdr->hdr_len * 4;
        iphdr_len = ntohs(iphdr->packet_len);
-       printk("ip input coming from %x of size %d", ntohs(iphdr->dst_addr), iphdr_len);
+       // printk("ip input coming from %x of size %d", ntohs(iphdr->dst_addr), iphdr_len);
   /* header length exceeds first pbuf length, or ip length exceeds total pbuf length? */
   if ((iphdr_hlen > p->len) || (iphdr_len > p->tot_len)) {
     if (iphdr_hlen > p->len) {
@@ -146,7 +146,7 @@ int ip_input(struct pbuf *p) {
                pbuf_free(p);
        }
 
-       printk ("loc head %p, loc protocol %p\n", iphdr, &iphdr->protocol);
+       //printk ("loc head %p, loc protocol %p\n", iphdr, &iphdr->protocol);
        /* currently a noop, compared to the memory wasted, cutting out ipheader is not really saving much */
        // pbuf_realloc(p, iphdr_len);
        switch (iphdr->protocol) {
index 13573a4..88a3689 100644 (file)
@@ -185,6 +185,7 @@ bool pbuf_deref(struct pbuf *p){
 void attach_pbuf(struct pbuf *p, struct pbuf_head *ph){
        spin_lock_irqsave(&ph->lock);
        ph->qlen++;
+       pbuf_ref(p);
        STAILQ_INSERT_TAIL(&ph->pbuf_fifo, p, next);
        spin_unlock_irqsave(&ph->lock);
 }
@@ -308,7 +309,6 @@ pbuf_cat(struct pbuf *h, struct pbuf *t)
 int pbuf_header(struct pbuf *p, int delta){ // increase header size
        uint8_t type = p->type;
        void *payload = p->payload;
-       printk("delta %d \n", delta);
        if (p == NULL || delta == 0)
                return 0;
        // This assertion used to apply when len meant allocated space..
index 9c09477..3c2d27e 100644 (file)
@@ -124,10 +124,6 @@ int udp_sendto(struct udp_pcb *pcb, struct pbuf *p,
                printd("params src addr %x, dst addr %x, length %x \n", global_ip.s_addr, (dst_ip->s_addr), 
                                          q->tot_len);
 
-               uint32_t checksum = udp_sum_calc(q->tot_len, &(global_ip.s_addr), &(dst_ip->s_addr), false,
-                                                                                               q->payload);
-               printd ("method theirs %x \n", checksum);
-               
     udphdr->checksum = inet_chksum_pseudo(q, htonl(global_ip.s_addr), dst_ip->s_addr,
                                                                                         IPPROTO_UDP, q->tot_len);
                printd ("method ours %x\n", udphdr->checksum);
@@ -242,6 +238,12 @@ int udp_attach(struct pbuf *p, struct sock *socket) {
 /** TODO: think about combining udp_input and ip_input together */
 // TODO: figure out if we even need a PCB? or just socket buff. 
 // TODO: test out looking up pcbs.. since matching function may fail
+
+void wrap_restart_kthread(struct trapframe *tf, uint32_t srcid,
+                                       long a0, long a1, long a2){
+       restart_kthread((struct kthread*) a0);
+}
+
 int udp_input(struct pbuf *p){
        struct udp_hdr *udphdr;
 
@@ -256,7 +258,7 @@ int udp_input(struct pbuf *p){
                pbuf_free(p);
                return -1;
        }
-                       printk("start of udp %p\n", p->payload);
+       printk("start of udp %p\n", p->payload);
        udphdr = (struct udp_hdr *)p->payload;
        /* convert the src port and dst port to host order */
        src = ntohs(udphdr->src_port);
@@ -285,11 +287,18 @@ int udp_input(struct pbuf *p){
        }
   /* ignore SO_REUSE */
        if (pcb != NULL && pcb->pcbsock != NULL){
-               printk("ready to attach \n");
                /* For each in the pbuf chain, disconnect from the chain and add it to the
                 * recv_buff of the correct socket 
                 */ 
-               attach_pbuf(p, &pcb->pcbsock->recv_buff);
+               struct socket *sock = pcb->pcbsock;
+               attach_pbuf(p, &sock->recv_buff);
+               struct kthread *kthread;
+               // multiple people might be waiting on the socket here..
+               kthread = __up_sem(&(sock->sem), FALSE);
+               if (kthread) {
+                        send_kernel_message(core_id(), (amr_t)wrap_restart_kthread, (long)kthread, 0, 0,
+                                                                                                 KMSG_ROUTINE);
+               }
                // the attaching of pbuf should have increfed pbuf ref, so free is simply a decref
                pbuf_free(p);
        }
index 5bf35cc..9240abd 100644 (file)
@@ -71,6 +71,7 @@ struct socket* alloc_sock(int socket_family, int socket_type, int protocol){
        newsock->so_state = SS_ISDISCONNECTED;
        pbuf_head_init(&newsock->recv_buff);
        pbuf_head_init(&newsock->send_buff);
+       init_sem(&newsock->sem, 0);
        if (socket_type == SOCK_DGRAM){
                newsock->so_pcb = udp_new();
                /* back link */
@@ -229,6 +230,11 @@ intreg_t sys_sendto(struct proc *p_proc, int fd, const void *buffer, size_t leng
     error = send_iov(soc, iov, flags);
        #endif
 }
+
+/* UDP and TCP has different waiting semantics
+ * UDP requires any packet to be available. 
+ * TCP requires accumulation of certain size? 
+ */
 intreg_t sys_recvfrom(struct proc *p, int socket, void *restrict buffer, size_t length, int flags, struct sockaddr *restrict address, socklen_t *restrict address_len){
        struct socket* sock = getsocket(p, socket);     
        int copied = 0;
@@ -240,20 +246,23 @@ intreg_t sys_recvfrom(struct proc *p, int socket, void *restrict buffer, size_t
        if (sock->so_type == SOCK_DGRAM){
                struct pbuf_head *ph = &(sock->recv_buff);
                struct pbuf* buf = NULL;
-               /* TODO: busy poll the socket buffer for now */
-               while (buf == NULL){
+               buf = detach_pbuf(ph);
+               if (!buf){
+                       // about to sleep
+                       sleep_on(&sock->sem);
                        buf = detach_pbuf(ph);
-                       if (buf){
-                               copied = buf->len - sizeof(struct udp_hdr);
-                               if (copied > length)
-                                       copied = length;
+                       // Someone woke me up, there should be data..
+                       assert(buf);
+               } else {
+                       __down_sem(&sock->sem, NULL);
+               }
+                       copied = buf->len - sizeof(struct udp_hdr);
+                       if (copied > length)
+                               copied = length;
                        pbuf_header(buf, -UDP_HDR_SZ);
-                       printk("loc of payload %p\n", buf->payload);
                        // copy it to user space
                        returnval = memcpy_to_user_errno(p, buffer, buf->payload, copied);
-                       }
                }
-       }
        if (returnval < 0) 
                return -1;
        else
index ad2b767..88665a2 100644 (file)
@@ -9,6 +9,7 @@
 #include <string.h>
 #include <stdlib.h>
 #define BUF_SIZE 16
+#define LARGE_BUFFER_SIZE 2048
 
 /* Test program
  *
 int main(int argc, char* argv[]) {
        struct sockaddr_in server;
        char buf[BUF_SIZE] = "hello world";
+       char bulkdata[LARGE_BUFFER_SIZE] = "testme";
        char recv_buf[BUF_SIZE];
-       int sockfd, n;
+       int sockfd, n, inqemu;
        struct hostent* host;
-       if (argc != 3){
-               printf("udp_test hostname portnum\n");
-               return -1;
-       }
+
        // ignore the host for now
-       //host = gethostbyname(argv[1]); //hostname
+       if (argc == 2){
+               printf("in qemu client\n");
+               inqemu = 1;
+       }
+       else if (argc == 3){
+               printf("linux client\n");
+               inqemu = 0;
+       } else 
+       {
+               printf("incorrect number of parameters\n");
+       }
+       if (!inqemu){
+               host = gethostbyname(argv[1]); //hostname
+       }
        bzero(&server, sizeof(server));
        server.sin_family = AF_INET;
-       server.sin_port = htons(atoi(argv[2]));
-       server.sin_addr.s_addr = inet_addr("10.0.0.1"); //hardcoded server 
+       if (inqemu)
+               server.sin_port = htons(atoi(argv[1]));
+       else
+               server.sin_port = htons(atoi(argv[2]));
+
+
+       if (inqemu)
+               server.sin_addr.s_addr = inet_addr("10.0.0.1"); //hardcoded server 
+       else
+               memcpy(&server.sin_addr.s_addr, host->h_addr, host->h_length);
        
        char* printbuf = (char*)&server.sin_addr.s_addr;
        int size = sizeof(server.sin_addr.s_addr);      
@@ -49,20 +69,25 @@ int main(int argc, char* argv[]) {
 
        printf ("udp_test: sockfd %d \n", sockfd);
        int socklen = sizeof(server);
+       // sending large chunk of data of 2K, more than one frame
+       // int sendsize =  sendto(sockfd, bulkdata, LARGE_BUFFER_SIZE, 0, (struct sockaddr*) &server, socklen);
+
+       // sending a large chunk of data but fitting in one packet
+        //int sendsize =  sendto(sockfd, bulkdata, 500, 0, (struct sockaddr*) &server, socklen);
 
        int sendsize = sendto(sockfd, buf, strlen(buf), 0, (struct sockaddr*) &server, socklen);
        printf("sendto returns %d, errno %d\n", sendsize, errno);
        //assume BUF_SIZE is larger than the packet.. so we will get to see what actually comes back..
        int j=0;
-       for (j=0; j<1; j++){
+       for (j=0; j<10; j++){
                strcpy(recv_buf, "DEADBEEFDEADBEE");
-               if (((n = recvfrom(sockfd, recv_buf, BUF_SIZE, 0, (struct sockaddr*) &server, &socklen))< 0)){
+               // if (((n = recvfrom(sockfd, recv_buf, BUF_SIZE, 0, (struct sockaddr*) &server, &socklen))< 0)){
+               if (((n = recvfrom(sockfd, recv_buf, 5, 0, (struct sockaddr*) &server, &socklen))< 0)){ // should discard if it is udp..
                        printf("recv failed\n");
                }
-               recv_buf[n-2] = 0; //null terminate
-               printf("recv %d with length %d from result %s\n", j,n,  recv_buf);
+               recv_buf[n-1] = 0; //null terminate
+               printf("[OUTPUT] recv %d with length %d from result %s\n", j,n,  recv_buf);
        }
-
-
+       while(1){;}
        close(sockfd);
 }