common: changed stream-transport to use fragbuf.
authorKrisztian Litkey <krisztian.litkey@intel.com>
Thu, 24 Jan 2013 18:07:50 +0000 (20:07 +0200)
committerKrisztian Litkey <kli@iki.fi>
Tue, 29 Jan 2013 15:49:33 +0000 (17:49 +0200)
src/common/stream-transport.c

index 6f73623..45fb981 100644 (file)
@@ -34,6 +34,7 @@
 #include <fcntl.h>
 #include <sys/types.h>
 #include <sys/socket.h>
+#include <sys/ioctl.h>
 #include <netinet/in.h>
 #include <sys/un.h>
 #include <sys/uio.h>
@@ -42,6 +43,7 @@
 #include <murphy/common/mm.h>
 #include <murphy/common/log.h>
 #include <murphy/common/msg.h>
+#include <murphy/common/fragbuf.h>
 #include <murphy/common/transport.h>
 
 #define TCP4  "tcp4"
@@ -57,9 +59,7 @@ typedef struct {
     MRP_TRANSPORT_PUBLIC_FIELDS;         /* common transport fields */
     int             sock;                /* TCP socket */
     mrp_io_watch_t *iow;                 /* socket I/O watch */
-    void           *ibuf;                /* input buffer */
-    size_t          isize;               /* input buffer size */
-    size_t          idata;               /* amount of input data */
+    mrp_fragbuf_t  *buf;                 /* fragment buffer */
 } strm_t;
 
 
@@ -268,11 +268,19 @@ static int strm_createfrom(mrp_transport_t *mt, void *conn)
         }
 
         if (t->connected) {
-            events = MRP_IO_EVENT_IN | MRP_IO_EVENT_HUP;
-            t->iow = mrp_add_io_watch(t->ml, t->sock, events, strm_recv_cb, t);
+            t->buf = mrp_fragbuf_create(TRUE, 0);
 
-            if (t->iow != NULL)
-                return TRUE;
+            if (t->buf != NULL) {
+                events = MRP_IO_EVENT_IN | MRP_IO_EVENT_HUP;
+                t->iow = mrp_add_io_watch(t->ml, t->sock, events,
+                                          strm_recv_cb, t);
+
+                if (t->iow != NULL)
+                    return TRUE;
+
+                mrp_fragbuf_destroy(t->buf);
+                t->buf = NULL;
+            }
         }
     }
 
@@ -323,8 +331,9 @@ static int strm_accept(mrp_transport_t *mt, mrp_transport_t *mlt)
 
     addrlen = sizeof(addr);
     t->sock = accept(lt->sock, &addr.any, &addrlen);
+    t->buf  = mrp_fragbuf_create(TRUE, 0);
 
-    if (t->sock >= 0) {
+    if (t->sock >= 0 && t->buf != NULL) {
         if (mt->flags & MRP_TRANSPORT_REUSEADDR) {
             on = 1;
             setsockopt(t->sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
@@ -348,6 +357,8 @@ static int strm_accept(mrp_transport_t *mt, mrp_transport_t *mlt)
             t->sock = -1;
         }
     }
+    else
+        mrp_fragbuf_destroy(t->buf);
 
     return FALSE;
 }
@@ -360,10 +371,8 @@ static void strm_close(mrp_transport_t *mt)
     mrp_del_io_watch(t->iow);
     t->iow = NULL;
 
-    mrp_free(t->ibuf);
-    t->ibuf  = NULL;
-    t->isize = 0;
-    t->idata = 0;
+    mrp_fragbuf_destroy(t->buf);
+    t->buf = NULL;
 
     if (t->sock >= 0){
         close(t->sock);
@@ -375,12 +384,13 @@ static void strm_close(mrp_transport_t *mt)
 static void strm_recv_cb(mrp_mainloop_t *ml, mrp_io_watch_t *w, int fd,
                          mrp_io_event_t events, void *user_data)
 {
-    strm_t           *t  = (strm_t *)user_data;
+    strm_t          *t  = (strm_t *)user_data;
     mrp_transport_t *mt = (mrp_transport_t *)t;
-    uint32_t        *sizep, size;
-    ssize_t          n, space, left;
-    void            *data;
-    int              old, error;
+    void            *data, *buf;
+    uint32_t         pending;
+    size_t           size;
+    ssize_t          n;
+    int              error;
 
     MRP_UNUSED(ml);
     MRP_UNUSED(w);
@@ -395,22 +405,10 @@ static void strm_recv_cb(mrp_mainloop_t *ml, mrp_io_watch_t *w, int fd,
             return;
         }
 
-        /*
-         * enlarge the buffer if we're out of space
-         */
-    realloc:
-        if (t->idata == t->isize) {
-            if (t->isize > sizeof(size)) {
-                old       = t->isize;
-                sizep     = t->ibuf;
-                size      = sizeof(size) + ntohl(*sizep);
-                t->isize  = size;
-            }
-            else {
-                old      = 0;
-                t->isize = DEFAULT_SIZE;
-            }
-            if (!mrp_reallocz(t->ibuf, old, t->isize)) {
+        while (ioctl(fd, FIONREAD, &pending) == 0 && pending > 0) {
+            buf = mrp_fragbuf_alloc(t->buf, pending);
+
+            if (buf == NULL) {
                 error = ENOMEM;
             fatal_error:
             closed:
@@ -424,53 +422,30 @@ static void strm_recv_cb(mrp_mainloop_t *ml, mrp_io_watch_t *w, int fd,
                 t->check_destroy(mt);
                 return;
             }
-        }
-
 
-        space = t->isize - t->idata;
-        while ((n = read(fd, t->ibuf + t->idata, space)) > 0) {
-            t->idata += n;
-            left      = 0;
+            n = read(fd, buf, pending);
 
-            if (t->idata >= sizeof(size)) {
-                sizep = t->ibuf;
-                size  = ntohl(*sizep);
-
-                while (t->idata >= sizeof(size) + size) {
-                    data  = t->ibuf + sizeof(size);
-                    error = t->recv_data(mt, data, size, NULL, 0);
-
-                    if (error)
-                        goto fatal_error;
-
-                    if (t->check_destroy(mt))
-                        return;
-
-                    left = t->idata - (sizeof(size) + size);
-                    memmove(t->ibuf, t->ibuf + sizeof(size) + size, left);
-                    t->idata = left;
+            if (n >= 0) {
+                if (n < pending)
+                    mrp_fragbuf_trim(t->buf, buf, pending, n);
+            }
 
-                    if (t->idata >= sizeof(size)) {
-                        sizep = t->ibuf;
-                        size = ntohl(*sizep);
-                    }
-                    else
-                        size = (uint32_t)-1;
-                }
+            if (n < 0 && errno != EAGAIN) {
+                error = EIO;
+                goto fatal_error;
             }
+        }
 
-            /* no more data to read in this frame */
-            if (left == 0)
-                break;
+        data = NULL;
+        size = 0;
+        while (mrp_fragbuf_pull(t->buf, &data, &size)) {
+            error = t->recv_data(mt, data, size, NULL, 0);
 
-            space = t->isize - t->idata;
-            if (space == 0)
-                goto realloc;
-        }
+            if (error)
+                goto fatal_error;
 
-        if (n < 0 && errno != EAGAIN) {
-            error = EIO;
-            goto fatal_error;
+            if (t->check_destroy(mt))
+                return;
         }
     }
 
@@ -532,16 +507,23 @@ static int strm_connect(mrp_transport_t *mt, mrp_sockaddr_t *addr,
         return FALSE;
 
     if (connect(t->sock, &addr->any, addrlen) == 0) {
-        events = MRP_IO_EVENT_IN | MRP_IO_EVENT_HUP;
-        t->iow = mrp_add_io_watch(t->ml, t->sock, events, strm_recv_cb, t);
+        t->buf = mrp_fragbuf_create(TRUE, 0);
 
-        if (t->iow != NULL) {
-            on = 1;
-            setsockopt(t->sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
-            nb = 1;
-            fcntl(t->sock, F_SETFL, O_NONBLOCK, nb);
+        if (t->buf != NULL) {
+            events = MRP_IO_EVENT_IN | MRP_IO_EVENT_HUP;
+            t->iow = mrp_add_io_watch(t->ml, t->sock, events, strm_recv_cb, t);
 
-            return TRUE;
+            if (t->iow != NULL) {
+                on = 1;
+                setsockopt(t->sock, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
+                nb = 1;
+                fcntl(t->sock, F_SETFL, O_NONBLOCK, nb);
+
+                return TRUE;
+            }
+
+            mrp_fragbuf_destroy(t->buf);
+            t->buf = NULL;
         }
     }
 
@@ -564,6 +546,9 @@ static int strm_disconnect(mrp_transport_t *mt)
 
         shutdown(t->sock, SHUT_RDWR);
 
+        mrp_fragbuf_destroy(t->buf);
+        t->buf = NULL;
+
         return TRUE;
     }
     else
@@ -583,7 +568,7 @@ static int strm_send(mrp_transport_t *mt, mrp_msg_t *msg)
         size = mrp_msg_default_encode(msg, &buf);
 
         if (size >= 0) {
-            len = htonl(size);
+            len = htobe32(size);
             iov[0].iov_base = &len;
             iov[0].iov_len  = sizeof(len);
             iov[1].iov_base = buf;