Enable the send thread.
authorSung-jae Park <nicesj.park@samsung.com>
Wed, 19 Dec 2012 13:04:08 +0000 (22:04 +0900)
committerSung-jae Park <nicesj.park@samsung.com>
Wed, 19 Dec 2012 14:27:04 +0000 (23:27 +0900)
Packet should be sent via ONE thread.

If it sent from the multiple threads,
we have to care the thread-safety.

If the packet's destination address is 'ZERO',
Pull up it to the server's main thread and handle it using the
method_table, or forward packet to the destination.

Change-Id: Ied696b921747e6dc009fc1c0be06cf587b0c6af8

include/packet.h
packaging/libcom-core.spec
src/com-core_packet-router.c
src/com-core_packet.c
src/packet.c

index cfbd838..abdb046 100644 (file)
@@ -60,6 +60,7 @@ extern const unsigned long const packet_destination(const struct packet *packet)
 extern int packet_set_destination(struct packet *packet, unsigned long destination);
 extern int packet_set_mask(struct packet *packet, unsigned long mask);
 extern unsigned long packet_mask(const struct packet *packet);
+extern int packet_swap_address(struct packet *packet);
 
 extern const int const packet_version(const struct packet *packet);
 extern const int const packet_payload_size(const struct packet *packet);
index 375a264..d289e57 100644 (file)
@@ -38,6 +38,7 @@ mkdir -p %{buildroot}/usr/share/license
 %manifest libcom-core.manifest
 %defattr(-,root,root,-)
 /usr/lib/*.so*
+/usr/share/license/*
 
 %files devel
 %defattr(-,root,root,-)
@@ -47,4 +48,3 @@ mkdir -p %{buildroot}/usr/share/license
 /usr/include/com-core/com-core_thread.h
 /usr/include/com-core/secure_socket.h
 /usr/lib/pkgconfig/*.pc
-/usr/share/license/*
index 7c748a2..483a6f9 100644 (file)
@@ -90,13 +90,19 @@ struct router {
 
        double timeout;
 
-       pthread_mutex_t packet_list_lock;
-       struct dlist *packet_list;
+       pthread_mutex_t recv_packet_list_lock;
+       struct dlist *recv_packet_list;
 
        pthread_mutex_t route_list_lock;
        struct dlist *route_list;
 
-       int evt_pipe[2];
+       pthread_mutex_t send_packet_list_lock;
+       struct dlist *send_packet_list;
+
+       int recv_pipe[2];
+       int send_pipe[2];
+
+       pthread_t send_thid;
 
        guint id;
 
@@ -136,8 +142,11 @@ static struct info {
        .error_list = NULL,
 };
 
-static inline struct packet *get_packet(struct router *router, int *handle, pid_t *pid);
-static inline int put_packet(struct router *router, int handle, struct packet *packet, pid_t pid);
+static inline struct packet *get_recv_packet(struct router *router, int *handle, pid_t *pid);
+static inline int put_recv_packet(struct router *router, int handle, struct packet *packet, pid_t pid);
+
+static inline struct packet *get_send_packet(struct router *router, int *handle);
+static inline int put_send_packet(struct router *router, int handle, struct packet *packet);
 
 /*!
  * \note
@@ -309,13 +318,25 @@ static inline struct router *find_router_by_handle(int handle)
        struct router *router;
 
        dlist_foreach(s_info.router_list, l, router) {
-               if (router->handle == handle)
+               if (router->is_server) {
+                       struct dlist *cl;
+                       struct client *client;
+                       /*!
+                        * Find the client list
+                        */
+                       dlist_foreach(router->info.server.client_list, cl, client) {
+                               if (client->handle == handle)
+                                       return router;
+                       }
+               } else if (router->handle == handle) {
                        return router;
+               }
        }
 
        return NULL;
 }
 
+
 /*!
  * \NOTE
  * Running thread: Main
@@ -331,7 +352,7 @@ static gboolean packet_cb(GIOChannel *src, GIOCondition cond, gpointer data)
        pid_t pid = (pid_t)-1;
 
        evt_handle = g_io_channel_unix_get_fd(src);
-       if (evt_handle != router->evt_pipe[PIPE_READ]) {
+       if (evt_handle != router->recv_pipe[PIPE_READ]) {
                ErrPrint("Invalid FD\n");
                goto errout;
        }
@@ -346,7 +367,7 @@ static gboolean packet_cb(GIOChannel *src, GIOCondition cond, gpointer data)
                goto errout;
        }
 
-       packet = get_packet(router, &handle, &pid);
+       packet = get_recv_packet(router, &handle, &pid);
        if (!packet) {
                (void)invoke_disconnected_cb(router, handle);
                clear_request_ctx(handle);
@@ -375,7 +396,7 @@ static gboolean packet_cb(GIOChannel *src, GIOCondition cond, gpointer data)
 
                        result_packet = router->service(handle, pid, packet, router->data);
                        if (result_packet) {
-                               ErrPrint("This is not need result packet\n");
+                               ErrPrint("Invalid result packet\n");
                                packet_destroy(result_packet);
                        }
                        break;
@@ -391,9 +412,9 @@ static gboolean packet_cb(GIOChannel *src, GIOCondition cond, gpointer data)
                                break;
                        }
 
-                       ret = com_core_send(handle, (void *)packet_data(result_packet), packet_size(result_packet), router->timeout);
-                       if (ret != packet_size(result_packet))
-                               ErrPrint("Failed to send reply packet. client can be block\n");
+                       ret = put_send_packet(router, handle, packet);
+                       if (ret < 0)
+                               ErrPrint("Failed to send a packet\n");
                        break;
                case PACKET_ERROR:
                default:
@@ -415,6 +436,10 @@ errout:
        return FALSE;
 }
 
+/*!
+ * \NOTE
+ * Running thread: Main
+ */
 static struct packet *service_handler(int handle, pid_t pid, const struct packet *packet, void *data)
 {
        struct method *table = data;
@@ -439,6 +464,116 @@ static struct packet *service_handler(int handle, pid_t pid, const struct packet
 }
 
 /*!
+ * \NOTE:
+ * Running thread: Server or Client or Send thread
+ */
+static inline int select_event(int handle, double timeout)
+{
+       fd_set set;
+       int status;
+       int ret;
+
+       FD_ZERO(&set);
+       FD_SET(handle, &set);
+
+       status = pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+       if (status != 0)
+               ErrPrint("Failed to set cancelstate: %s\n", strerror(status));
+       if (timeout > 0.0f) {
+               struct timeval tv;
+
+               tv.tv_sec = (unsigned long)timeout;
+               tv.tv_usec = (timeout - (unsigned long)timeout) * 1000000u;
+
+               ret = select(handle + 1, NULL, &set, NULL, &tv);
+       } else if (timeout == 0.0f) {
+               ret = select(handle + 1, NULL, &set, NULL, NULL);
+       } else {
+               ErrPrint("Invalid timeout: %lf (it must be greater than 0.0)\n", timeout);
+               status = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+               if (status != 0)
+                       ErrPrint("Failed to set cancelstate: %s\n", strerror(status));
+               return -EINVAL;
+       }
+
+       if (ret < 0) {
+               ret = -errno;
+               if (errno == EINTR) {
+                       DbgPrint("Select receives INTR\n");
+                       status = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+                       return -EAGAIN;
+               }
+
+               ErrPrint("Error: %s\n", strerror(errno));
+               status = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+               if (status != 0)
+                       ErrPrint("Failed to set cancelstate: %s\n", strerror(status));
+               return ret;
+       } else if (ret == 0) {
+               ErrPrint("Timeout expired\n");
+               status = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+               if (status != 0)
+                       ErrPrint("Failed to set cancelstate: %s\n", strerror(status));
+               return -ETIMEDOUT;
+       }
+       status = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+       if (status != 0)
+               ErrPrint("Failed to set cancelstate: %s\n", strerror(status));
+
+       if (!FD_ISSET(handle, &set)) {
+               ErrPrint("Unexpected handle is toggled\n");
+               return -EINVAL;
+       }
+
+       return 0;
+}
+
+/*!
+ * \NOTE
+ * Running thread: Send thread
+ */
+static void *send_main(void *data)
+{
+       struct router *router = data;
+       struct packet *packet;
+       int handle;
+       int ret;
+
+       while (1) {
+               /*!
+                * \note
+                * select event has cancel point
+                */
+               ret = select_event(router->send_pipe[PIPE_READ], 0.0f);
+               if (ret == -EAGAIN)
+                       continue;
+
+               if (ret < 0)
+                       break;
+
+               packet = get_send_packet(router, &handle);
+               if (!packet) {
+                       DbgPrint("NULL Packet. Terminate thread\n");
+                       break;
+               }
+
+               switch (packet_type(packet)) {
+               case PACKET_REQ:
+               case PACKET_REQ_NOACK:
+                       ret = com_core_send(handle, (void *)packet_data(packet), packet_size(packet), router->timeout);
+                       break;
+               default:
+                       ret = -EINVAL;
+                       break;
+               }
+
+               packet_destroy(packet);
+       }
+
+       return (void *)ret;
+}
+
+/*!
  * \NOTE
  * Running thread: Main
  */
@@ -454,7 +589,7 @@ static struct router *create_router(const char *sock, int handle, struct method
                return NULL;
        }
 
-       ret = pthread_mutex_init(&router->packet_list_lock, NULL);
+       ret = pthread_mutex_init(&router->recv_packet_list_lock, NULL);
        if (ret != 0) {
                ErrPrint("Mutex creation failed: %s\n", strerror(ret));
                free(router);
@@ -464,16 +599,38 @@ static struct router *create_router(const char *sock, int handle, struct method
        ret = pthread_mutex_init(&router->route_list_lock, NULL);
        if (ret != 0) {
                ErrPrint("Mutex craetion failed: %s\n", strerror(ret));
-               ret = pthread_mutex_destroy(&router->packet_list_lock);
+               ret = pthread_mutex_destroy(&router->recv_packet_list_lock);
                if (ret != 0)
                        ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+               free(router);
+               return NULL;
+       }
+
+       ret = pthread_mutex_init(&router->send_packet_list_lock, NULL);
+       if (ret != 0) {
+               ErrPrint("Mutex creation failed: %s\n", strerror(ret));
+
+               ret = pthread_mutex_destroy(&router->recv_packet_list_lock);
+               if (ret != 0)
+                       ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+               ret = pthread_mutex_destroy(&router->route_list_lock);
+               if (ret != 0)
+                       ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+               free(router);
                return NULL;
        }
 
        router->sock = strdup(sock);
        if (!router->sock) {
                ErrPrint("Heap: %s\n", strerror(errno));
-               ret = pthread_mutex_destroy(&router->packet_list_lock);
+               ret = pthread_mutex_destroy(&router->send_packet_list_lock);
+               if (ret != 0)
+                       ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+               ret = pthread_mutex_destroy(&router->recv_packet_list_lock);
                if (ret != 0)
                        ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
 
@@ -485,12 +642,43 @@ static struct router *create_router(const char *sock, int handle, struct method
                return NULL;
        }
 
-       ret = pipe2(router->evt_pipe, O_NONBLOCK | O_CLOEXEC);
+       ret = pipe2(router->recv_pipe, O_NONBLOCK | O_CLOEXEC);
        if (ret < 0) {
                ErrPrint("pipe2: %s\n", strerror(errno));
                free(router->sock);
 
-               ret = pthread_mutex_destroy(&router->packet_list_lock);
+               ret = pthread_mutex_destroy(&router->send_packet_list_lock);
+               if (ret != 0)
+                       ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+               ret = pthread_mutex_destroy(&router->recv_packet_list_lock);
+               if (ret != 0)
+                       ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+               ret = pthread_mutex_destroy(&router->route_list_lock);
+               if (ret != 0)
+                       ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+               free(router);
+               return NULL;
+       }
+
+       ret = pipe2(router->send_pipe, O_NONBLOCK | O_CLOEXEC);
+       if (ret < 0) {
+               ErrPrint("pipe2: %s\n", strerror(errno));
+               free(router->sock);
+
+               if (close(router->recv_pipe[0]) < 0)
+                       ErrPrint("close: %s\n", strerror(errno));
+
+               if (close(router->recv_pipe[1]) < 0)
+                       ErrPrint("close: %s\n", strerror(errno));
+
+               ret = pthread_mutex_destroy(&router->send_packet_list_lock);
+               if (ret != 0)
+                       ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+               ret = pthread_mutex_destroy(&router->recv_packet_list_lock);
                if (ret != 0)
                        ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
 
@@ -506,13 +694,27 @@ static struct router *create_router(const char *sock, int handle, struct method
        router->service = service_handler;
        router->data = table;
 
-       gio = g_io_channel_unix_new(router->evt_pipe[PIPE_READ]);
+       gio = g_io_channel_unix_new(router->recv_pipe[PIPE_READ]);
        if (!gio) {
-               close(router->evt_pipe[PIPE_READ]);
-               close(router->evt_pipe[PIPE_WRITE]);
+               if (close(router->recv_pipe[PIPE_READ]) < 0)
+                       ErrPrint("close: %s\n", strerror(errno));
+
+               if (close(router->recv_pipe[PIPE_WRITE]) < 0)
+                       ErrPrint("close: %s\n", strerror(errno));
+
+               if (close(router->send_pipe[PIPE_READ]) < 0)
+                       ErrPrint("close: %s\n", strerror(errno));
+
+               if (close(router->send_pipe[PIPE_WRITE]) < 0)
+                       ErrPrint("close: %s\n", strerror(errno));
+
                free(router->sock);
 
-               ret = pthread_mutex_destroy(&router->packet_list_lock);
+               ret = pthread_mutex_destroy(&router->send_packet_list_lock);
+               if (ret != 0)
+                       ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+               ret = pthread_mutex_destroy(&router->recv_packet_list_lock);
                if (ret != 0)
                        ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
 
@@ -534,11 +736,26 @@ static struct router *create_router(const char *sock, int handle, struct method
                        g_error_free(err);
                }
                g_io_channel_unref(gio);
-               close(router->evt_pipe[PIPE_READ]);
-               close(router->evt_pipe[PIPE_WRITE]);
+
+               if (close(router->recv_pipe[PIPE_READ]) < 0)
+                       ErrPrint("close: %s\n", strerror(errno));
+
+               if (close(router->recv_pipe[PIPE_WRITE]) < 0)
+                       ErrPrint("close: %s\n", strerror(errno));
+
+               if (close(router->send_pipe[PIPE_READ]) < 0)
+                       ErrPrint("close: %s\n", strerror(errno));
+
+               if (close(router->send_pipe[PIPE_WRITE]) < 0)
+                       ErrPrint("close: %s\n", strerror(errno));
+
                free(router->sock);
 
-               ret = pthread_mutex_destroy(&router->packet_list_lock);
+               ret = pthread_mutex_destroy(&router->send_packet_list_lock);
+               if (ret != 0)
+                       ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+               ret = pthread_mutex_destroy(&router->recv_packet_list_lock);
                if (ret != 0)
                        ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
 
@@ -553,28 +770,89 @@ static struct router *create_router(const char *sock, int handle, struct method
        g_io_channel_unref(gio);
 
        s_info.router_list = dlist_append(s_info.router_list, router);
+
+       ret = pthread_create(&router->send_thid, NULL, send_main, router);
+       if (ret != 0) {
+               ErrPrint("Failed to create a send thread: %s\n", strerror(ret));
+               dlist_remove_data(s_info.router_list, router);
+
+               g_source_remove(router->id);
+
+               if (close(router->recv_pipe[PIPE_READ]) < 0)
+                       ErrPrint("Close: %s\n", strerror(errno));
+
+               if (close(router->recv_pipe[PIPE_WRITE]) < 0)
+                       ErrPrint("Close: %s\n", strerror(errno));
+
+               if (close(router->send_pipe[PIPE_READ]) < 0)
+                       ErrPrint("Close: %s\n", strerror(errno));
+
+               if (close(router->send_pipe[PIPE_WRITE]) < 0)
+                       ErrPrint("Close: %s\n", strerror(errno));
+
+               free(router->sock);
+
+               ret = pthread_mutex_destroy(&router->send_packet_list_lock);
+               if (ret != 0)
+                       ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+               ret = pthread_mutex_destroy(&router->recv_packet_list_lock);
+               if (ret != 0)
+                       ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+               ret = pthread_mutex_destroy(&router->route_list_lock);
+               if (ret != 0)
+                       ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+               free(router);
+               return NULL;
+       }
+
        return router;
 }
 
 /*!
  * \NOTE
  * Running thread: Main
+ *
+ * Before call this, every thread which uses this router object must has to be terminated.
  */
 static inline int destroy_router(struct router *router)
 {
        int handle;
        int ret;
 
+       ret = put_send_packet(router, -1, NULL);
+       DbgPrint("Put NULL Packet to terminate send thread (%d)\n", ret);
+
+       ret = pthread_join(router->send_thid, NULL);
+       if (ret != 0)
+               ErrPrint("Join: %s\n", strerror(ret));
+
        dlist_remove_data(s_info.router_list, router);
 
        if (router->id > 0)
                g_source_remove(router->id);
 
-       close(router->evt_pipe[PIPE_READ]);
-       close(router->evt_pipe[PIPE_WRITE]);
+       if (close(router->recv_pipe[PIPE_READ]) < 0)
+               ErrPrint("close: %s\n", strerror(errno));
+
+       if (close(router->recv_pipe[PIPE_WRITE]) < 0)
+               ErrPrint("close: %s\n", strerror(errno));
+
+       if (close(router->send_pipe[PIPE_READ]) < 0)
+               ErrPrint("close: %s\n", strerror(errno));
+
+       if (close(router->send_pipe[PIPE_WRITE]) < 0)
+               ErrPrint("close: %s\n", strerror(errno));
+
        free(router->sock);
 
-       ret = pthread_mutex_destroy(&router->packet_list_lock);
+       ret = pthread_mutex_destroy(&router->send_packet_list_lock);
+       if (ret != 0)
+               ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+       ret = pthread_mutex_destroy(&router->recv_packet_list_lock);
        if (ret != 0)
                ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
 
@@ -627,10 +905,9 @@ static inline int route_packet(struct router *router, int handle, struct packet
                                 *
                                 * We have to optimize the processing time in the CRITICAL SECTION
                                 */
-                               ret = com_core_send(route->handle, (void *)packet_data(packet), packet_size(packet), router->timeout);
-                               if (ret != packet_size(packet)) {
+                               ret = put_send_packet(router, route->handle, packet);
+                               if (ret < 0)
                                        ErrPrint("Failed to send whole packet\n");
-                               }
 
                                processed++;
                        }
@@ -650,11 +927,47 @@ static inline int route_packet(struct router *router, int handle, struct packet
        return 0;
 }
 
+static inline int put_send_packet(struct router *router, int handle, struct packet *packet)
+{
+       if (packet) {
+               int status;
+               struct packet_item *item;
+
+               item = malloc(sizeof(*item));
+               if (!item) {
+                       packet_destroy(packet);
+                       return -ENOMEM;
+               }
+
+               item->packet = packet;
+               item->pid = (pid_t)-1;
+
+               status = pthread_mutex_lock(&router->send_packet_list_lock);
+               if (status != 0)
+                       ErrPrint("Failed to lock: %s\n", strerror(errno));
+
+               router->send_packet_list = dlist_append(router->send_packet_list, item);
+
+               status = pthread_mutex_unlock(&router->send_packet_list_lock);
+               if (status != 0)
+                       ErrPrint("Failed to unlock: %s\n", strerror(errno));
+       }
+
+       /*!
+        * \note
+        * Producing an event on event pipe
+        */
+       if (write(router->send_pipe[PIPE_WRITE], &handle, sizeof(handle)) != sizeof(handle))
+               ErrPrint("Failed to put an event: %s\n", strerror(errno));
+
+       return 0;
+}
+
 /*!
  * \NOTE
  * Running thread: Client / Server leaf thread
  */
-static inline int put_packet(struct router *router, int handle, struct packet *packet, pid_t pid)
+static inline int put_recv_packet(struct router *router, int handle, struct packet *packet, pid_t pid)
 {
        /*!
         * If a packet is NULL, the connection is terminated
@@ -672,14 +985,14 @@ static inline int put_packet(struct router *router, int handle, struct packet *p
                item->packet = packet;
                item->pid = pid;
 
-               status = pthread_mutex_lock(&router->packet_list_lock);
+               status = pthread_mutex_lock(&router->recv_packet_list_lock);
                if (status != 0)
                        ErrPrint("Failed to lock: %s\n", strerror(errno));
 
-               router->packet_list = dlist_append(router->packet_list, item);
+               router->recv_packet_list = dlist_append(router->recv_packet_list, item);
 
-               status = pthread_mutex_unlock(&router->packet_list_lock);
-               if (!status != 0)
+               status = pthread_mutex_unlock(&router->recv_packet_list_lock);
+               if (status != 0)
                        ErrPrint("Failed to unlock: %s\n", strerror(errno));
        }
 
@@ -687,7 +1000,7 @@ static inline int put_packet(struct router *router, int handle, struct packet *p
         * \note
         * Producing an event on event pipe
         */
-       if (write(router->evt_pipe[PIPE_WRITE], &handle, sizeof(handle)) != sizeof(handle))
+       if (write(router->recv_pipe[PIPE_WRITE], &handle, sizeof(handle)) != sizeof(handle))
                ErrPrint("Failed to put an event: %s\n", strerror(errno));
 
        return 0;
@@ -695,110 +1008,78 @@ static inline int put_packet(struct router *router, int handle, struct packet *p
 
 /*!
  * \NOTE
- * Running thread: Main thread
+ * Running thread: Send thread
  */
-static inline struct packet *get_packet(struct router *router, int *handle, pid_t *pid)
+static inline struct packet *get_send_packet(struct router *router, int *handle)
 {
        int status;
        struct packet *packet = NULL;
        struct dlist *l;
        struct packet_item *item;
 
-       status = pthread_mutex_lock(&router->packet_list_lock);
+       status = pthread_mutex_lock(&router->send_packet_list_lock);
        if (status != 0)
                ErrPrint("Failed to get lock: %s\n", strerror(errno));
 
-       l = dlist_nth(router->packet_list, 0);
+       l = dlist_nth(router->send_packet_list, 0);
        if (l) {
                item = dlist_data(l);
-               router->packet_list = dlist_remove(router->packet_list, l);
-
+               router->send_packet_list = dlist_remove(router->send_packet_list, l);
                packet = item->packet;
-               if (pid)
-                       *pid = item->pid;
-
                free(item);
        }
 
-       status = pthread_mutex_unlock(&router->packet_list_lock);
-       if (!status != 0)
+       status = pthread_mutex_unlock(&router->send_packet_list_lock);
+       if (status != 0)
                ErrPrint("Failed to unlock: %s\n", strerror(errno));
 
-       /*!
-        * \note
-        * Consuming an event from event pipe
-        * Even if we cannot get the packet(NULL), we should consuming event
-        * Because the NULL packet means disconnected
-        */
-       if (read(router->evt_pipe[PIPE_READ], handle, sizeof(*handle)) != sizeof(*handle))
+       if (read(router->send_pipe[PIPE_READ], handle, sizeof(*handle)) != sizeof(*handle))
                ErrPrint("Failed to get an event: %s\n", strerror(errno));
 
        return packet;
 }
 
 /*!
- * \NOTE:
- * Running thread: Server or Client thread
+ * \NOTE
+ * Running thread: Main thread
  */
-static inline int select_event(int handle, double timeout)
+static inline struct packet *get_recv_packet(struct router *router, int *handle, pid_t *pid)
 {
-       fd_set set;
        int status;
-       int ret;
-
-       FD_ZERO(&set);
-       FD_SET(handle, &set);
+       struct packet *packet = NULL;
+       struct dlist *l;
+       struct packet_item *item;
 
-       status = pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+       status = pthread_mutex_lock(&router->recv_packet_list_lock);
        if (status != 0)
-               ErrPrint("Failed to set cancelstate: %s\n", strerror(status));
-       if (timeout > 0.0f) {
-               struct timeval tv;
-
-               tv.tv_sec = (unsigned long)timeout;
-               tv.tv_usec = (timeout - (unsigned long)timeout) * 1000000u;
+               ErrPrint("Failed to get lock: %s\n", strerror(errno));
 
-               ret = select(handle + 1, NULL, &set, NULL, &tv);
-       } else if (timeout == 0.0f) {
-               ret = select(handle + 1, NULL, &set, NULL, NULL);
-       } else {
-               ErrPrint("Invalid timeout: %lf (it must be greater than 0.0)\n", timeout);
-               status = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
-               if (status != 0)
-                       ErrPrint("Failed to set cancelstate: %s\n", strerror(status));
-               return -EINVAL;
-       }
+       l = dlist_nth(router->recv_packet_list, 0);
+       if (l) {
+               item = dlist_data(l);
+               router->recv_packet_list = dlist_remove(router->recv_packet_list, l);
 
-       if (ret < 0) {
-               ret = -errno;
-               if (errno == EINTR) {
-                       DbgPrint("Select receives INTR\n");
-                       status = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
-                       return -EAGAIN;
-               }
+               packet = item->packet;
+               if (pid)
+                       *pid = item->pid;
 
-               ErrPrint("Error: %s\n", strerror(errno));
-               status = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
-               if (status != 0)
-                       ErrPrint("Failed to set cancelstate: %s\n", strerror(status));
-               return ret;
-       } else if (ret == 0) {
-               ErrPrint("Timeout expired\n");
-               status = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
-               if (status != 0)
-                       ErrPrint("Failed to set cancelstate: %s\n", strerror(status));
-               return -ETIMEDOUT;
+               free(item);
        }
-       status = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+
+       status = pthread_mutex_unlock(&router->recv_packet_list_lock);
        if (status != 0)
-               ErrPrint("Failed to set cancelstate: %s\n", strerror(status));
+               ErrPrint("Failed to unlock: %s\n", strerror(errno));
 
-       if (!FD_ISSET(handle, &set)) {
-               ErrPrint("Unexpected handle is toggled\n");
-               return -EINVAL;
-       }
+       /*!
+        * \note
+        * Consuming an event from event pipe
+        * Even if we cannot get the packet(NULL), we should consuming event
+        * Because the NULL packet means disconnected
+        */
+       if (read(router->recv_pipe[PIPE_READ], handle, sizeof(*handle)) != sizeof(*handle))
+               ErrPrint("Failed to get an event: %s\n", strerror(errno));
 
-       return 0;
+       return packet;
 }
 
 static inline int build_packet(int handle, struct recv_ctx *ctx)
@@ -911,16 +1192,22 @@ static int router_common_main(struct router *router, int handle, struct recv_ctx
                }
 
                if (ctx->state == RECV_STATE_READY) {
-                       if (packet_flag(ctx->packet) & PACKET_FLAG_ROUTE)
+                       /*!
+                        * \NOTE
+                        *
+                        * If the destination address is ZERO,
+                        * Pull up the packet to this server.
+                        */
+                       if (packet_destination(ctx->packet))
                                route_packet(router, handle, ctx->packet);
                        else
-                               put_packet(router, handle, ctx->packet, ctx->pid);
+                               put_recv_packet(router, handle, ctx->packet, ctx->pid);
 
                        ctx->state = RECV_STATE_INIT;
                }
        }
 
-       put_packet(router, handle, NULL, ctx->pid);
+       put_recv_packet(router, handle, NULL, ctx->pid);
        return ret;
 }
 
@@ -928,7 +1215,7 @@ static int router_common_main(struct router *router, int handle, struct recv_ctx
  * \NOTE
  * Running thread: Server thread
  */
-static void *router_server_main(void *data)
+static void *server_main(void *data)
 {
        struct client *client = data;
        struct router *router = client->router;
@@ -948,7 +1235,7 @@ static void *router_server_main(void *data)
  * \NOTE
  * Running thread: Client thread
  */
-static void *router_client_main(void *data)
+static void *client_main(void *data)
 {
        struct router *router = data;
        struct recv_ctx ctx;
@@ -1019,7 +1306,7 @@ static gboolean accept_cb(GIOChannel *src, GIOCondition cond, gpointer data)
        client->router = router;
        router->info.server.client_list = dlist_append(router->info.server.client_list, client);
 
-       status = pthread_create(&client->thid, NULL, router_server_main, client);
+       status = pthread_create(&client->thid, NULL, server_main, client);
        if (status != 0) {
                ErrPrint("Thread creation failed: %s\n", strerror(status));
                dlist_remove_data(router->info.server.client_list, client);
@@ -1061,8 +1348,8 @@ EAPI int com_core_packet_router_server_create(const char *sock, double timeout,
 
        gio = g_io_channel_unix_new(router->handle);
        if (!gio) {
+               handle = destroy_router(router);
                secure_socket_destroy_handle(handle);
-               destroy_router(router);
                return -EIO;
        }
 
@@ -1078,8 +1365,8 @@ EAPI int com_core_packet_router_server_create(const char *sock, double timeout,
                }
                g_io_channel_unref(gio);
 
+               handle = destroy_router(router);
                secure_socket_destroy_handle(handle);
-               destroy_router(router);
                return -EIO;
        }
 
@@ -1110,28 +1397,28 @@ EAPI int com_core_packet_router_client_create(const char *sock, double timeout,
        router->timeout = timeout;
        router->is_server = 0;
 
-       status = pthread_mutex_init(&router->packet_list_lock, NULL);
+       status = pthread_mutex_init(&router->recv_packet_list_lock, NULL);
        if (status != 0) {
                ErrPrint("Mutex creation failed: %s\n", strerror(status));
 
+               handle = destroy_router(router);
                secure_socket_destroy_handle(handle);
-               destroy_router(router);
                return -EFAULT;
        }
 
        status = pthread_mutex_init(&router->route_list_lock, NULL);
        if (status != 0) {
                ErrPrint("Mutex creation failed: %s\n", strerror(status));
+               handle = destroy_router(router);
                secure_socket_destroy_handle(handle);
-               destroy_router(router);
                return -EFAULT;
        }
 
-       status = pthread_create(&router->info.client.thid, NULL, router_client_main, router);
+       status = pthread_create(&router->info.client.thid, NULL, client_main, router);
        if (status != 0) {
                ErrPrint("Thread creation failed: %s\n", strerror(status));
+               handle = destroy_router(router);
                secure_socket_destroy_handle(handle);
-               destroy_router(router);
                return -EFAULT;
        }
 
@@ -1207,9 +1494,8 @@ EAPI void *com_core_packet_router_destroy(int handle)
 
        data = router->data;
 
-       secure_socket_destroy_handle(router->handle);
-
-       destroy_router(router);
+       handle = destroy_router(router);
+       secure_socket_destroy_handle(handle);
 
        return data;
 }
@@ -1220,14 +1506,24 @@ EAPI void *com_core_packet_router_destroy(int handle)
  */
 EAPI int com_core_packet_router_async_send(int handle, struct packet *packet, double timeout, int (*recv_cb)(pid_t pid, int handle, const struct packet *packet, void *data), void *data)
 {
-       int ret;
        struct request_ctx *ctx;
+       struct router *router;
+       int ret;
+
+       if (handle < 0 || !packet)
+               return -EINVAL;
 
        if (packet_type(packet) != PACKET_REQ) {
                ErrPrint("Invalid packet - should be PACKET_REQ\n");
                return -EINVAL;
        }
 
+       router = find_router_by_handle(handle);
+       if (!router) {
+               ErrPrint("Router is not available\n");
+               return -EINVAL;
+       }
+
        ctx = create_request_ctx(handle);
        if (!ctx)
                return -ENOMEM;
@@ -1236,14 +1532,11 @@ EAPI int com_core_packet_router_async_send(int handle, struct packet *packet, do
        ctx->data = data;
        ctx->packet = packet_ref(packet);
 
-       ret = com_core_send(handle, (void *)packet_data(packet), packet_size(packet), timeout);
-       if (ret != packet_size(packet)) {
-               ErrPrint("Send failed. %d <> %d (handle: %d)\n", ret, packet_size(packet), handle);
+       ret = put_send_packet(router, handle, packet);
+       if (ret < 0)
                destroy_request_ctx(ctx);
-               return -EIO;
-       }
 
-       return 0;
+       return ret;
 }
 
 /*!
@@ -1252,7 +1545,18 @@ EAPI int com_core_packet_router_async_send(int handle, struct packet *packet, do
  */
 EAPI int com_core_packet_router_send_only(int handle, struct packet *packet)
 {
-       return com_core_packet_send_only(handle, packet);
+       struct router *router;
+
+       if (handle < 0 || !packet || packet_type(packet) != PACKET_REQ_NOACK)
+               return -EINVAL;
+
+       router = find_router_by_handle(handle);
+       if (!router) {
+               ErrPrint("Rouer is not available\n");
+               return -EINVAL;
+       }
+
+       return put_send_packet(router, handle, packet);
 }
 
 /*!
@@ -1272,7 +1576,10 @@ EAPI int com_core_packet_router_add_route(int handle, unsigned long address, int
 {
        struct router *router;
        struct route *route;
+       struct route *tmp;
        int status;
+       struct dlist *l;
+       int found = 0;
 
        if (handle < 0 || !address || h < 0)
                return -EINVAL;
@@ -1297,12 +1604,25 @@ EAPI int com_core_packet_router_add_route(int handle, unsigned long address, int
        if (status != 0)
                ErrPrint("Failed to lock: %s\n", strerror(status));
 
-       router->route_list = dlist_append(router->route_list, route);
+       dlist_foreach(router->route_list, l, tmp) {
+               if (tmp->address == address) {
+                       found = 1;
+                       break;
+               }
+       }
+
+       if (!found)
+               router->route_list = dlist_append(router->route_list, route);
 
        status = pthread_mutex_unlock(&router->route_list_lock);
        if (status != 0)
                ErrPrint("Failed to unlock: %s\n", strerror(status));
 
+       if (found) {
+               free(route);
+               return -EEXIST;
+       }
+
        return 0;
 }
 
@@ -1317,6 +1637,7 @@ EAPI int com_core_packet_router_del_route(int handle, unsigned long address)
        struct dlist *l;
        struct dlist *n;
        int status;
+       int found = 0;
 
        if (handle < 0 || !address)
                return -EINVAL;
@@ -1339,6 +1660,8 @@ EAPI int com_core_packet_router_del_route(int handle, unsigned long address)
 
                DbgPrint("Delete an entry from the table (%lu : %d)\n", route->address, route->handle);
                free(route);
+
+               found = 1;
                break;
        }
 
@@ -1346,9 +1669,13 @@ EAPI int com_core_packet_router_del_route(int handle, unsigned long address)
        if (status != 0)
                ErrPrint("Failed to unlock: %s\n", strerror(status));
 
-       return 0;
+       return found ? 0 : -ENOENT;
 }
 
+/*!
+ * \NOTE
+ * Running thread: Main
+ */
 EAPI int com_core_packet_router_update_route(int handle, unsigned long address, int h)
 {
        struct router *router;
index 6a887d5..5304c63 100644 (file)
@@ -405,6 +405,11 @@ EAPI int com_core_packet_async_send(int handle, struct packet *packet, double ti
        int ret;
        struct request_ctx *ctx;
 
+       if (handle < 0 || !packet) {
+               ErrPrint("Invalid argument\n");
+               return -EINVAL;
+       }
+
        if (packet_type(packet) != PACKET_REQ) {
                ErrPrint("Invalid packet - should be PACKET_REQ\n");
                return -EINVAL;
@@ -456,6 +461,11 @@ EAPI struct packet *com_core_packet_oneshot_send(const char *addr, struct packet
        void *ptr;
        int size;
 
+       if (!addr || !packet) {
+               ErrPrint("Invalid argument\n");
+               return NULL;
+       }
+
        fd = secure_socket_create_client(addr);
        if (fd < 0)
                return NULL;
index 302a839..4412cc4 100644 (file)
@@ -308,9 +308,8 @@ EAPI struct packet *packet_create_reply(const struct packet *packet, const char
        }
 
        result->state = VALID;
-
-       result->data->head.source = 0lu;
-       result->data->head.destination = 0lu;
+       result->data->head.source = packet->data->head.destination;
+       result->data->head.destination = packet->data->head.source;
        result->data->head.mask = 0xFFFFFFFF;
 
        result->data->head.seq = packet->data->head.seq;
@@ -327,6 +326,20 @@ EAPI struct packet *packet_create_reply(const struct packet *packet, const char
        return packet_ref(result);
 }
 
+EAPI int packet_swap_address(struct packet *packet)
+{
+       unsigned long tmp;
+
+       if (!packet || packet->state != VALID)
+               return -EINVAL;
+
+       tmp = packet->data->head.source;
+       packet->data->head.source = packet->data->head.destination;
+       packet->data->head.destination = tmp;
+
+       return 0;
+}
+
 EAPI struct packet *packet_create(const char *cmd, const char *fmt, ...)
 {
        struct packet *packet;