Adding select support for basic socket udp receive.
authorDavid Zhu <yuzhu@cs.berkeley.edu>
Thu, 21 Apr 2011 00:35:29 +0000 (17:35 -0700)
committerDavid Zhu <yuzhu@cs.berkeley.edu>
Sun, 29 Apr 2012 00:09:31 +0000 (17:09 -0700)
Currently do not support descriptors that are waiting for writes,
since all of our writes to sockets are non-blocking due to udp.
In the future, this will likely change once I implement TCP.

kern/include/kthread.h
kern/include/ros/bits/syscall.h
kern/include/socket.h
kern/include/time.h
kern/include/vfs.h
kern/src/net/udp.c
kern/src/socket.c
kern/src/syscall.c
tests/udp_test.c
tools/compilers/gcc-glibc/glibc-2.14.1-ros/sysdeps/ros/select.c [new file with mode: 0644]

index 39fbfaf..5e2af11 100644 (file)
 
 struct proc;
 struct kthread;
+struct semaphore;
+struct semaphore_entry;
 TAILQ_HEAD(kthread_tailq, kthread);
+LIST_HEAD(semaphore_list, semaphore_entry);
+
 
 /* This captures the essence of a kernel context that we want to suspend.  When
  * a kthread is running, we make sure its stacktop is the default kernel stack,
@@ -30,6 +34,7 @@ struct kthread {
        /* ID, other shit, etc */
 };
 
+
 /* Semaphore for kthreads to sleep on.  0 or less means you need to sleep */
 struct semaphore {
        struct kthread_tailq            waiters;
@@ -37,6 +42,11 @@ struct semaphore {
        spinlock_t                                      lock;
 };
 
+struct semaphore_entry {
+       struct semaphore sem;
+       int fd;
+       LIST_ENTRY(semaphore_entry) link;
+};
 /* This doesn't have to be inline, but it doesn't matter for now */
 static inline void init_sem(struct semaphore *sem, int signals)
 {
index 2634723..38bad17 100644 (file)
@@ -45,6 +45,7 @@
 #define SYS_socket                                     40
 #define SYS_sendto                                     41
 #define SYS_recvfrom                           42
+#define SYS_select          43
 
 
 /* Platform specific syscalls */
index 83821f7..8561311 100644 (file)
@@ -6,6 +6,7 @@
 #include <atomic.h>
 #include <net/pbuf.h>
 #include <kthread.h>
+#include <vfs.h>
 // Just a couple of AF types that we might support
 #define AF_UNSPEC      0
 #define AF_UNIX                1       /* Unix domain sockets          */
@@ -47,6 +48,7 @@ enum sock_type {
     SOCK_DCCP   = 6,
     SOCK_PACKET = 10,
 };
+
 struct socket{
   //int so_count;       /* (b) reference count */
   short   so_type;        /* (a) generic type, see socket.h */
@@ -59,7 +61,9 @@ struct socket{
        void    *so_pcb;        /* protocol control block */
        struct pbuf_head recv_buff;
        struct pbuf_head send_buff;
-       struct semaphore sem;   /* semaphone to for a process to sleep on */
+       struct semaphore sem;
+       spinlock_t waiter_lock;
+       struct semaphore_list waiters;   /* semaphone to for a process to sleep on */
        
        //struct  vnet *so_vnet;      /* network stack instance */
        //struct  protosw *so_proto;  /* (a) protocol handle */
@@ -112,6 +116,8 @@ int send_datagram(struct socket* sock, struct iovec* iov, int flags);
 intreg_t sys_socket(struct proc *p, int socket_family, int socket_type, int protocol);
 intreg_t sys_sendto(struct proc *p, int socket, const void *buffer, size_t length, int flags, const struct sockaddr *dest_addr, socklen_t dest_len);
 intreg_t sys_recvfrom(struct proc *p, int socket, void *restrict buffer, size_t length, int flags, struct sockaddr *restrict address, socklen_t *restrict address_len);
+intreg_t sys_select(struct proc *p, int nfds, fd_set *readfds, fd_set *writefds,
+                               fd_set *exceptfds, struct timeval *timeout);
 
 
 #endif /* ROS_SOCKET_H */
index 111dc91..79371e1 100644 (file)
@@ -17,6 +17,11 @@ struct itimerspec {
   struct timespec  it_value;     /* Timer expiration */
 };
 
+struct timeval {
+       time_t tv_sec;          /* seconds */
+       time_t tv_usec; /* microseconds */
+};
+
 void udelay(uint64_t usec);
 uint64_t tsc2sec(uint64_t tsc_time);
 uint64_t tsc2msec(uint64_t tsc_time);
index 6ff1af0..af1c1db 100644 (file)
@@ -342,13 +342,22 @@ struct vfsmount {
  * could just use the fd_array to check for openness instead of the bitmask,
  * but eventually we might want to use the bitmasks for other things (like
  * which files are close_on_exec. */
-struct fd_set {
+
+typedef struct fd_set {
     uint8_t fds_bits[BYTES_FOR_BITMASK(NR_FILE_DESC_MAX)];
-};
+} fd_set;
+
+
 struct small_fd_set {
     uint8_t fds_bits[BYTES_FOR_BITMASK(NR_FILE_DESC_DEFAULT)];
 };
 
+/* Helper macros to manage fd_sets */
+#define FD_SET(n, p)  ((p)->fds_bits[(n)/8] |=  (1 << ((n) & 7)))
+#define FD_CLR(n, p)  ((p)->fds_bits[(n)/8] &= ~(1 << ((n) & 7)))
+#define FD_ISSET(n,p) ((p)->fds_bits[(n)/8] &   (1 << ((n) & 7)))
+#define FD_ZERO(p)    memset((void*)(p),0,sizeof(*(p)))
+
 /* Describes an open file.  We need this, since the FD flags are supposed to be
  * per file descriptor, not per file (like the file status flags). */
 struct file_desc {
@@ -368,7 +377,7 @@ struct files_struct {
        struct file_desc                        fd_array[NR_OPEN_FILES_DEFAULT];
 };
 
-/* Process specific filesysten info */
+/* Process specific filesystem info */
 struct fs_struct {
        spinlock_t                                      lock;
        int                                                     umask;
index 3c2d27e..ca398f9 100644 (file)
@@ -276,7 +276,6 @@ int udp_input(struct pbuf *p){
        }
 
        /* checksum check */
-       // HERE!
   if (udphdr->checksum != 0) {
     if (inet_chksum_pseudo(p, (iphdr->src_addr), (iphdr->dst_addr), 
                                 IPPROTO_UDP, p->tot_len) != 0){
@@ -293,11 +292,29 @@ int udp_input(struct pbuf *p){
                struct socket *sock = pcb->pcbsock;
                attach_pbuf(p, &sock->recv_buff);
                struct kthread *kthread;
+               /* First notify any blocking recv calls,
+                * then notify anyone who might be waiting in a select
+                */ 
                // multiple people might be waiting on the socket here..
                kthread = __up_sem(&(sock->sem), FALSE);
                if (kthread) {
                         send_kernel_message(core_id(), (amr_t)wrap_restart_kthread, (long)kthread, 0, 0,
                                                                                                  KMSG_ROUTINE);
+               } else {
+                       // wake up all waiters
+                       struct semaphore_entry *sentry, *sentry_tmp;
+                       spin_lock(&sock->waiter_lock);
+                 LIST_FOREACH_SAFE(sentry, &(sock->waiters), link, sentry_tmp){
+                               //should only wake up one waiter
+                               kthread = __up_sem(&sentry->sem, true);
+                               if (kthread){
+                               send_kernel_message(core_id(), (amr_t)wrap_restart_kthread, (long)kthread, 0, 0,
+                                                                                                 KMSG_ROUTINE);
+                               }
+                               LIST_REMOVE(sentry, link);
+                               /* do not need to free since all the sentry are stack-based vars */
+                       }
+                       spin_unlock(&sock->waiter_lock);
                }
                // the attaching of pbuf should have increfed pbuf ref, so free is simply a decref
                pbuf_free(p);
index 9240abd..f83a4de 100644 (file)
@@ -9,6 +9,7 @@
 #include <ros/common.h>
 #include <socket.h>
 #include <vfs.h>
+#include <time.h>
 #include <kref.h>
 #include <syscall.h>
 #include <sys/uio.h>
@@ -18,6 +19,8 @@
 #include <net/udp.h>
 #include <net/pbuf.h>
 #include <umem.h>
+#include <kthread.h>
+#include <bitmask.h>
 /*
  *TODO: Figure out which socket.h is used where
  *There are several socket.h in kern, and a couple more in glibc. Perhaps the glibc ones
@@ -72,6 +75,8 @@ struct socket* alloc_sock(int socket_family, int socket_type, int protocol){
        pbuf_head_init(&newsock->recv_buff);
        pbuf_head_init(&newsock->send_buff);
        init_sem(&newsock->sem, 0);
+       spinlock_init(&newsock->waiter_lock);
+       LIST_INIT(&newsock->waiters);
        if (socket_type == SOCK_DGRAM){
                newsock->so_pcb = udp_new();
                /* back link */
@@ -133,7 +138,6 @@ intreg_t sys_socket(struct proc *p, int socket_family, int socket_type, int prot
        return fd;
 }
 intreg_t send_iov(struct socket* sock, struct iovec* iov, int flags){
-       
        // COPY_COUNT: for each iov, copy into mbuf, and send
        // should not copy here, copy in the protocol..
        // should be esomething like this sock->so_proto->pr_send(sock, iov, flags);
@@ -268,3 +272,72 @@ intreg_t sys_recvfrom(struct proc *p, int socket, void *restrict buffer, size_t
        else
                return copied;
 }
+
+static int selscan(int maxfdp1, fd_set *readset_in, fd_set *writeset_in, fd_set *exceptset_in,
+             fd_set *readset_out, fd_set *writeset_out, fd_set *exceptset_out){
+       return 0;
+}
+
+/* TODO: Start respecting the time out value */ 
+/* TODO: start respecting writefds and exceptfds */
+intreg_t sys_select(struct proc *p, int nfds, fd_set *readfds, fd_set *writefds,
+                               fd_set *exceptfds, struct timeval *timeout){
+       /* Create a semaphore */
+       struct semaphore_entry read_sem; 
+
+       init_sem(&(read_sem.sem), 0);
+
+       /* insert into the sem list of a fd / socket */
+       int low_fd = 0;
+       for (int i = low_fd; i< nfds; i++) {
+               if(FD_ISSET(i, readfds)){
+                 struct socket* sock = getsocket(p, i);
+                       /* if the fd is not open or if the file descriptor is not a socket 
+                        * go to the next in the fd set 
+                        */
+                       if (sock == NULL) continue;
+                       /* for each file that is open, insert this semaphore to be woken up when there
+                       * is data available to be read
+                       */
+                       spin_lock(&sock->waiter_lock);
+                       LIST_INSERT_HEAD(&sock->waiters, &read_sem, link);
+                       spin_unlock(&sock->waiter_lock);
+               }
+       }
+       /* At this point wait on the semaphore */
+  sleep_on(&(read_sem.sem));
+       /* someone woke me up, so walk through the list of descriptors and find one that is ready */
+       /* remove itself from all the lists that it is waiting on */
+       for (int i = low_fd; i<nfds; i++) {
+               if (FD_ISSET(i, readfds)){
+                       struct socket* sock = getsocket(p,i);
+                       if (sock == NULL) continue;
+                       spin_lock(&sock->waiter_lock);
+                       LIST_REMOVE(&read_sem, link);
+                       spin_unlock(&sock->waiter_lock);
+               }
+       }
+       fd_set readout, writeout, exceptout;
+       FD_ZERO(&readout);
+       FD_ZERO(&writeout);
+       FD_ZERO(&exceptout);
+       for (int i = low_fd; i< nfds; i ++){
+               if (readfds && FD_ISSET(i, readfds)){
+                 struct socket* sock = getsocket(p, i);
+                       if ((sock->recv_buff).qlen > 0){
+                               FD_SET(i, &readout);
+                       }
+                       /* if the socket is ready, then we can return it */
+               }
+       }
+       if (readfds)
+               memcpy(readfds, &readout, sizeof(*readfds));
+       if (writefds)
+               memcpy(writefds, &writeout, sizeof(*writefds));
+       if (exceptfds)
+               memcpy(readfds, &readout, sizeof(*readfds));
+
+       /* Sleep on that semaphore */
+       /* Somehow get these file descriptors to wake me up when there is new data */
+       return 0;
+}
index 3817435..28ff88c 100644 (file)
@@ -1386,6 +1386,8 @@ const static struct sys_table_entry syscall_table[] = {
        [SYS_socket] ={(syscall_t)sys_socket, "socket"},
        [SYS_sendto] ={(syscall_t)sys_sendto, "sendto"},
        [SYS_recvfrom] ={(syscall_t)sys_recvfrom, "recvfrom"},
+       [SYS_select] ={(syscall_t)sys_select, "select"},
+
 
        [SYS_read] = {(syscall_t)sys_read, "read"},
        [SYS_write] = {(syscall_t)sys_write, "write"},
index 88665a2..3116cfb 100644 (file)
@@ -73,15 +73,25 @@ int main(int argc, char* argv[]) {
        // int sendsize =  sendto(sockfd, bulkdata, LARGE_BUFFER_SIZE, 0, (struct sockaddr*) &server, socklen);
 
        // sending a large chunk of data but fitting in one packet
-        //int sendsize =  sendto(sockfd, bulkdata, 500, 0, (struct sockaddr*) &server, socklen);
-
+       //int sendsize =  sendto(sockfd, bulkdata, 500, 0, (struct sockaddr*) &server, socklen);
+       fd_set readset;
        int sendsize = sendto(sockfd, buf, strlen(buf), 0, (struct sockaddr*) &server, socklen);
        printf("sendto returns %d, errno %d\n", sendsize, errno);
        //assume BUF_SIZE is larger than the packet.. so we will get to see what actually comes back..
        int j=0;
+       int result;
        for (j=0; j<10; j++){
                strcpy(recv_buf, "DEADBEEFDEADBEE");
-               // if (((n = recvfrom(sockfd, recv_buf, BUF_SIZE, 0, (struct sockaddr*) &server, &socklen))< 0)){
+               // select before a blocking receive
+               do {
+                       FD_ZERO(&readset);
+                       FD_SET(sockfd, &readset);
+                       result = select(sockfd + 1, &readset, NULL, NULL, NULL);
+                       printf("select result %d \n", result);
+                       printf("readset %d \n", FD_ISSET(sockfd, &readset));
+               } while (result == -1 && errno == EINTR);
+               // configure recvfrom not to block when there is 
+
                if (((n = recvfrom(sockfd, recv_buf, 5, 0, (struct sockaddr*) &server, &socklen))< 0)){ // should discard if it is udp..
                        printf("recv failed\n");
                }
diff --git a/tools/compilers/gcc-glibc/glibc-2.14.1-ros/sysdeps/ros/select.c b/tools/compilers/gcc-glibc/glibc-2.14.1-ros/sysdeps/ros/select.c
new file mode 100644 (file)
index 0000000..446b4ae
--- /dev/null
@@ -0,0 +1,16 @@
+#include <sysdep.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <stdint.h>
+#include <unistd.h>
+#include <stddef.h>
+#include <sys/socket.h>
+#include <ros/syscall.h>
+
+int __select(int nfds, fd_set *readfds, fd_set *writefds,
+           fd_set *exceptfds, struct timeval *timeout) {
+       return ros_syscall(SYS_select, nfds, readfds, writefds, exceptfds, timeout, 0);
+}
+
+libc_hidden_def (__select)
+weak_alias (__select, select)