double timeout;
- pthread_mutex_t packet_list_lock;
- struct dlist *packet_list;
+ pthread_mutex_t recv_packet_list_lock;
+ struct dlist *recv_packet_list;
pthread_mutex_t route_list_lock;
struct dlist *route_list;
- int evt_pipe[2];
+ pthread_mutex_t send_packet_list_lock;
+ struct dlist *send_packet_list;
+
+ int recv_pipe[2];
+ int send_pipe[2];
+
+ pthread_t send_thid;
guint id;
.error_list = NULL,
};
-static inline struct packet *get_packet(struct router *router, int *handle, pid_t *pid);
-static inline int put_packet(struct router *router, int handle, struct packet *packet, pid_t pid);
+static inline struct packet *get_recv_packet(struct router *router, int *handle, pid_t *pid);
+static inline int put_recv_packet(struct router *router, int handle, struct packet *packet, pid_t pid);
+
+static inline struct packet *get_send_packet(struct router *router, int *handle);
+static inline int put_send_packet(struct router *router, int handle, struct packet *packet);
/*!
* \note
struct router *router;
dlist_foreach(s_info.router_list, l, router) {
- if (router->handle == handle)
+ if (router->is_server) {
+ struct dlist *cl;
+ struct client *client;
+ /*!
+ * Find the client list
+ */
+ dlist_foreach(router->info.server.client_list, cl, client) {
+ if (client->handle == handle)
+ return router;
+ }
+ } else if (router->handle == handle) {
return router;
+ }
}
return NULL;
}
+
/*!
* \NOTE
* Running thread: Main
pid_t pid = (pid_t)-1;
evt_handle = g_io_channel_unix_get_fd(src);
- if (evt_handle != router->evt_pipe[PIPE_READ]) {
+ if (evt_handle != router->recv_pipe[PIPE_READ]) {
ErrPrint("Invalid FD\n");
goto errout;
}
goto errout;
}
- packet = get_packet(router, &handle, &pid);
+ packet = get_recv_packet(router, &handle, &pid);
if (!packet) {
(void)invoke_disconnected_cb(router, handle);
clear_request_ctx(handle);
result_packet = router->service(handle, pid, packet, router->data);
if (result_packet) {
- ErrPrint("This is not need result packet\n");
+ ErrPrint("Invalid result packet\n");
packet_destroy(result_packet);
}
break;
break;
}
- ret = com_core_send(handle, (void *)packet_data(result_packet), packet_size(result_packet), router->timeout);
- if (ret != packet_size(result_packet))
- ErrPrint("Failed to send reply packet. client can be block\n");
+ ret = put_send_packet(router, handle, packet);
+ if (ret < 0)
+ ErrPrint("Failed to send a packet\n");
break;
case PACKET_ERROR:
default:
return FALSE;
}
+/*!
+ * \NOTE
+ * Running thread: Main
+ */
static struct packet *service_handler(int handle, pid_t pid, const struct packet *packet, void *data)
{
struct method *table = data;
}
/*!
+ * \NOTE:
+ * Running thread: Server or Client or Send thread
+ */
+static inline int select_event(int handle, double timeout)
+{
+ fd_set set;
+ int status;
+ int ret;
+
+ FD_ZERO(&set);
+ FD_SET(handle, &set);
+
+ status = pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+ if (status != 0)
+ ErrPrint("Failed to set cancelstate: %s\n", strerror(status));
+ if (timeout > 0.0f) {
+ struct timeval tv;
+
+ tv.tv_sec = (unsigned long)timeout;
+ tv.tv_usec = (timeout - (unsigned long)timeout) * 1000000u;
+
+ ret = select(handle + 1, NULL, &set, NULL, &tv);
+ } else if (timeout == 0.0f) {
+ ret = select(handle + 1, NULL, &set, NULL, NULL);
+ } else {
+ ErrPrint("Invalid timeout: %lf (it must be greater than 0.0)\n", timeout);
+ status = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ if (status != 0)
+ ErrPrint("Failed to set cancelstate: %s\n", strerror(status));
+ return -EINVAL;
+ }
+
+ if (ret < 0) {
+ ret = -errno;
+ if (errno == EINTR) {
+ DbgPrint("Select receives INTR\n");
+ status = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ return -EAGAIN;
+ }
+
+ ErrPrint("Error: %s\n", strerror(errno));
+ status = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ if (status != 0)
+ ErrPrint("Failed to set cancelstate: %s\n", strerror(status));
+ return ret;
+ } else if (ret == 0) {
+ ErrPrint("Timeout expired\n");
+ status = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ if (status != 0)
+ ErrPrint("Failed to set cancelstate: %s\n", strerror(status));
+ return -ETIMEDOUT;
+ }
+ status = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+ if (status != 0)
+ ErrPrint("Failed to set cancelstate: %s\n", strerror(status));
+
+ if (!FD_ISSET(handle, &set)) {
+ ErrPrint("Unexpected handle is toggled\n");
+ return -EINVAL;
+ }
+
+ return 0;
+}
+
+/*!
+ * \NOTE
+ * Running thread: Send thread
+ */
+static void *send_main(void *data)
+{
+ struct router *router = data;
+ struct packet *packet;
+ int handle;
+ int ret;
+
+ while (1) {
+ /*!
+ * \note
+ * select event has cancel point
+ */
+ ret = select_event(router->send_pipe[PIPE_READ], 0.0f);
+ if (ret == -EAGAIN)
+ continue;
+
+ if (ret < 0)
+ break;
+
+ packet = get_send_packet(router, &handle);
+ if (!packet) {
+ DbgPrint("NULL Packet. Terminate thread\n");
+ break;
+ }
+
+ switch (packet_type(packet)) {
+ case PACKET_REQ:
+ case PACKET_REQ_NOACK:
+ ret = com_core_send(handle, (void *)packet_data(packet), packet_size(packet), router->timeout);
+ break;
+ default:
+ ret = -EINVAL;
+ break;
+ }
+
+ packet_destroy(packet);
+ }
+
+ return (void *)ret;
+}
+
+/*!
* \NOTE
* Running thread: Main
*/
return NULL;
}
- ret = pthread_mutex_init(&router->packet_list_lock, NULL);
+ ret = pthread_mutex_init(&router->recv_packet_list_lock, NULL);
if (ret != 0) {
ErrPrint("Mutex creation failed: %s\n", strerror(ret));
free(router);
ret = pthread_mutex_init(&router->route_list_lock, NULL);
if (ret != 0) {
ErrPrint("Mutex craetion failed: %s\n", strerror(ret));
- ret = pthread_mutex_destroy(&router->packet_list_lock);
+ ret = pthread_mutex_destroy(&router->recv_packet_list_lock);
if (ret != 0)
ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+ free(router);
+ return NULL;
+ }
+
+ ret = pthread_mutex_init(&router->send_packet_list_lock, NULL);
+ if (ret != 0) {
+ ErrPrint("Mutex creation failed: %s\n", strerror(ret));
+
+ ret = pthread_mutex_destroy(&router->recv_packet_list_lock);
+ if (ret != 0)
+ ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+ ret = pthread_mutex_destroy(&router->route_list_lock);
+ if (ret != 0)
+ ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+ free(router);
return NULL;
}
router->sock = strdup(sock);
if (!router->sock) {
ErrPrint("Heap: %s\n", strerror(errno));
- ret = pthread_mutex_destroy(&router->packet_list_lock);
+ ret = pthread_mutex_destroy(&router->send_packet_list_lock);
+ if (ret != 0)
+ ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+ ret = pthread_mutex_destroy(&router->recv_packet_list_lock);
if (ret != 0)
ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
return NULL;
}
- ret = pipe2(router->evt_pipe, O_NONBLOCK | O_CLOEXEC);
+ ret = pipe2(router->recv_pipe, O_NONBLOCK | O_CLOEXEC);
if (ret < 0) {
ErrPrint("pipe2: %s\n", strerror(errno));
free(router->sock);
- ret = pthread_mutex_destroy(&router->packet_list_lock);
+ ret = pthread_mutex_destroy(&router->send_packet_list_lock);
+ if (ret != 0)
+ ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+ ret = pthread_mutex_destroy(&router->recv_packet_list_lock);
+ if (ret != 0)
+ ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+ ret = pthread_mutex_destroy(&router->route_list_lock);
+ if (ret != 0)
+ ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+ free(router);
+ return NULL;
+ }
+
+ ret = pipe2(router->send_pipe, O_NONBLOCK | O_CLOEXEC);
+ if (ret < 0) {
+ ErrPrint("pipe2: %s\n", strerror(errno));
+ free(router->sock);
+
+ if (close(router->recv_pipe[0]) < 0)
+ ErrPrint("close: %s\n", strerror(errno));
+
+ if (close(router->recv_pipe[1]) < 0)
+ ErrPrint("close: %s\n", strerror(errno));
+
+ ret = pthread_mutex_destroy(&router->send_packet_list_lock);
+ if (ret != 0)
+ ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+ ret = pthread_mutex_destroy(&router->recv_packet_list_lock);
if (ret != 0)
ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
router->service = service_handler;
router->data = table;
- gio = g_io_channel_unix_new(router->evt_pipe[PIPE_READ]);
+ gio = g_io_channel_unix_new(router->recv_pipe[PIPE_READ]);
if (!gio) {
- close(router->evt_pipe[PIPE_READ]);
- close(router->evt_pipe[PIPE_WRITE]);
+ if (close(router->recv_pipe[PIPE_READ]) < 0)
+ ErrPrint("close: %s\n", strerror(errno));
+
+ if (close(router->recv_pipe[PIPE_WRITE]) < 0)
+ ErrPrint("close: %s\n", strerror(errno));
+
+ if (close(router->send_pipe[PIPE_READ]) < 0)
+ ErrPrint("close: %s\n", strerror(errno));
+
+ if (close(router->send_pipe[PIPE_WRITE]) < 0)
+ ErrPrint("close: %s\n", strerror(errno));
+
free(router->sock);
- ret = pthread_mutex_destroy(&router->packet_list_lock);
+ ret = pthread_mutex_destroy(&router->send_packet_list_lock);
+ if (ret != 0)
+ ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+ ret = pthread_mutex_destroy(&router->recv_packet_list_lock);
if (ret != 0)
ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
g_error_free(err);
}
g_io_channel_unref(gio);
- close(router->evt_pipe[PIPE_READ]);
- close(router->evt_pipe[PIPE_WRITE]);
+
+ if (close(router->recv_pipe[PIPE_READ]) < 0)
+ ErrPrint("close: %s\n", strerror(errno));
+
+ if (close(router->recv_pipe[PIPE_WRITE]) < 0)
+ ErrPrint("close: %s\n", strerror(errno));
+
+ if (close(router->send_pipe[PIPE_READ]) < 0)
+ ErrPrint("close: %s\n", strerror(errno));
+
+ if (close(router->send_pipe[PIPE_WRITE]) < 0)
+ ErrPrint("close: %s\n", strerror(errno));
+
free(router->sock);
- ret = pthread_mutex_destroy(&router->packet_list_lock);
+ ret = pthread_mutex_destroy(&router->send_packet_list_lock);
+ if (ret != 0)
+ ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+ ret = pthread_mutex_destroy(&router->recv_packet_list_lock);
if (ret != 0)
ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
g_io_channel_unref(gio);
s_info.router_list = dlist_append(s_info.router_list, router);
+
+ ret = pthread_create(&router->send_thid, NULL, send_main, router);
+ if (ret != 0) {
+ ErrPrint("Failed to create a send thread: %s\n", strerror(ret));
+ dlist_remove_data(s_info.router_list, router);
+
+ g_source_remove(router->id);
+
+ if (close(router->recv_pipe[PIPE_READ]) < 0)
+ ErrPrint("Close: %s\n", strerror(errno));
+
+ if (close(router->recv_pipe[PIPE_WRITE]) < 0)
+ ErrPrint("Close: %s\n", strerror(errno));
+
+ if (close(router->send_pipe[PIPE_READ]) < 0)
+ ErrPrint("Close: %s\n", strerror(errno));
+
+ if (close(router->send_pipe[PIPE_WRITE]) < 0)
+ ErrPrint("Close: %s\n", strerror(errno));
+
+ free(router->sock);
+
+ ret = pthread_mutex_destroy(&router->send_packet_list_lock);
+ if (ret != 0)
+ ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+ ret = pthread_mutex_destroy(&router->recv_packet_list_lock);
+ if (ret != 0)
+ ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+ ret = pthread_mutex_destroy(&router->route_list_lock);
+ if (ret != 0)
+ ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+ free(router);
+ return NULL;
+ }
+
return router;
}
/*!
* \NOTE
* Running thread: Main
+ *
+ * Before call this, every thread which uses this router object must has to be terminated.
*/
static inline int destroy_router(struct router *router)
{
int handle;
int ret;
+ ret = put_send_packet(router, -1, NULL);
+ DbgPrint("Put NULL Packet to terminate send thread (%d)\n", ret);
+
+ ret = pthread_join(router->send_thid, NULL);
+ if (ret != 0)
+ ErrPrint("Join: %s\n", strerror(ret));
+
dlist_remove_data(s_info.router_list, router);
if (router->id > 0)
g_source_remove(router->id);
- close(router->evt_pipe[PIPE_READ]);
- close(router->evt_pipe[PIPE_WRITE]);
+ if (close(router->recv_pipe[PIPE_READ]) < 0)
+ ErrPrint("close: %s\n", strerror(errno));
+
+ if (close(router->recv_pipe[PIPE_WRITE]) < 0)
+ ErrPrint("close: %s\n", strerror(errno));
+
+ if (close(router->send_pipe[PIPE_READ]) < 0)
+ ErrPrint("close: %s\n", strerror(errno));
+
+ if (close(router->send_pipe[PIPE_WRITE]) < 0)
+ ErrPrint("close: %s\n", strerror(errno));
+
free(router->sock);
- ret = pthread_mutex_destroy(&router->packet_list_lock);
+ ret = pthread_mutex_destroy(&router->send_packet_list_lock);
+ if (ret != 0)
+ ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
+
+ ret = pthread_mutex_destroy(&router->recv_packet_list_lock);
if (ret != 0)
ErrPrint("Mutex destroy failed: %s\n", strerror(ret));
*
* We have to optimize the processing time in the CRITICAL SECTION
*/
- ret = com_core_send(route->handle, (void *)packet_data(packet), packet_size(packet), router->timeout);
- if (ret != packet_size(packet)) {
+ ret = put_send_packet(router, route->handle, packet);
+ if (ret < 0)
ErrPrint("Failed to send whole packet\n");
- }
processed++;
}
return 0;
}
+static inline int put_send_packet(struct router *router, int handle, struct packet *packet)
+{
+ if (packet) {
+ int status;
+ struct packet_item *item;
+
+ item = malloc(sizeof(*item));
+ if (!item) {
+ packet_destroy(packet);
+ return -ENOMEM;
+ }
+
+ item->packet = packet;
+ item->pid = (pid_t)-1;
+
+ status = pthread_mutex_lock(&router->send_packet_list_lock);
+ if (status != 0)
+ ErrPrint("Failed to lock: %s\n", strerror(errno));
+
+ router->send_packet_list = dlist_append(router->send_packet_list, item);
+
+ status = pthread_mutex_unlock(&router->send_packet_list_lock);
+ if (status != 0)
+ ErrPrint("Failed to unlock: %s\n", strerror(errno));
+ }
+
+ /*!
+ * \note
+ * Producing an event on event pipe
+ */
+ if (write(router->send_pipe[PIPE_WRITE], &handle, sizeof(handle)) != sizeof(handle))
+ ErrPrint("Failed to put an event: %s\n", strerror(errno));
+
+ return 0;
+}
+
/*!
* \NOTE
* Running thread: Client / Server leaf thread
*/
-static inline int put_packet(struct router *router, int handle, struct packet *packet, pid_t pid)
+static inline int put_recv_packet(struct router *router, int handle, struct packet *packet, pid_t pid)
{
/*!
* If a packet is NULL, the connection is terminated
item->packet = packet;
item->pid = pid;
- status = pthread_mutex_lock(&router->packet_list_lock);
+ status = pthread_mutex_lock(&router->recv_packet_list_lock);
if (status != 0)
ErrPrint("Failed to lock: %s\n", strerror(errno));
- router->packet_list = dlist_append(router->packet_list, item);
+ router->recv_packet_list = dlist_append(router->recv_packet_list, item);
- status = pthread_mutex_unlock(&router->packet_list_lock);
- if (!status != 0)
+ status = pthread_mutex_unlock(&router->recv_packet_list_lock);
+ if (status != 0)
ErrPrint("Failed to unlock: %s\n", strerror(errno));
}
* \note
* Producing an event on event pipe
*/
- if (write(router->evt_pipe[PIPE_WRITE], &handle, sizeof(handle)) != sizeof(handle))
+ if (write(router->recv_pipe[PIPE_WRITE], &handle, sizeof(handle)) != sizeof(handle))
ErrPrint("Failed to put an event: %s\n", strerror(errno));
return 0;
/*!
* \NOTE
- * Running thread: Main thread
+ * Running thread: Send thread
*/
-static inline struct packet *get_packet(struct router *router, int *handle, pid_t *pid)
+static inline struct packet *get_send_packet(struct router *router, int *handle)
{
int status;
struct packet *packet = NULL;
struct dlist *l;
struct packet_item *item;
- status = pthread_mutex_lock(&router->packet_list_lock);
+ status = pthread_mutex_lock(&router->send_packet_list_lock);
if (status != 0)
ErrPrint("Failed to get lock: %s\n", strerror(errno));
- l = dlist_nth(router->packet_list, 0);
+ l = dlist_nth(router->send_packet_list, 0);
if (l) {
item = dlist_data(l);
- router->packet_list = dlist_remove(router->packet_list, l);
-
+ router->send_packet_list = dlist_remove(router->send_packet_list, l);
packet = item->packet;
- if (pid)
- *pid = item->pid;
-
free(item);
}
- status = pthread_mutex_unlock(&router->packet_list_lock);
- if (!status != 0)
+ status = pthread_mutex_unlock(&router->send_packet_list_lock);
+ if (status != 0)
ErrPrint("Failed to unlock: %s\n", strerror(errno));
- /*!
- * \note
- * Consuming an event from event pipe
- * Even if we cannot get the packet(NULL), we should consuming event
- * Because the NULL packet means disconnected
- */
- if (read(router->evt_pipe[PIPE_READ], handle, sizeof(*handle)) != sizeof(*handle))
+ if (read(router->send_pipe[PIPE_READ], handle, sizeof(*handle)) != sizeof(*handle))
ErrPrint("Failed to get an event: %s\n", strerror(errno));
return packet;
}
/*!
- * \NOTE:
- * Running thread: Server or Client thread
+ * \NOTE
+ * Running thread: Main thread
*/
-static inline int select_event(int handle, double timeout)
+static inline struct packet *get_recv_packet(struct router *router, int *handle, pid_t *pid)
{
- fd_set set;
int status;
- int ret;
-
- FD_ZERO(&set);
- FD_SET(handle, &set);
+ struct packet *packet = NULL;
+ struct dlist *l;
+ struct packet_item *item;
- status = pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
+ status = pthread_mutex_lock(&router->recv_packet_list_lock);
if (status != 0)
- ErrPrint("Failed to set cancelstate: %s\n", strerror(status));
- if (timeout > 0.0f) {
- struct timeval tv;
-
- tv.tv_sec = (unsigned long)timeout;
- tv.tv_usec = (timeout - (unsigned long)timeout) * 1000000u;
+ ErrPrint("Failed to get lock: %s\n", strerror(errno));
- ret = select(handle + 1, NULL, &set, NULL, &tv);
- } else if (timeout == 0.0f) {
- ret = select(handle + 1, NULL, &set, NULL, NULL);
- } else {
- ErrPrint("Invalid timeout: %lf (it must be greater than 0.0)\n", timeout);
- status = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
- if (status != 0)
- ErrPrint("Failed to set cancelstate: %s\n", strerror(status));
- return -EINVAL;
- }
+ l = dlist_nth(router->recv_packet_list, 0);
+ if (l) {
+ item = dlist_data(l);
+ router->recv_packet_list = dlist_remove(router->recv_packet_list, l);
- if (ret < 0) {
- ret = -errno;
- if (errno == EINTR) {
- DbgPrint("Select receives INTR\n");
- status = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
- return -EAGAIN;
- }
+ packet = item->packet;
+ if (pid)
+ *pid = item->pid;
- ErrPrint("Error: %s\n", strerror(errno));
- status = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
- if (status != 0)
- ErrPrint("Failed to set cancelstate: %s\n", strerror(status));
- return ret;
- } else if (ret == 0) {
- ErrPrint("Timeout expired\n");
- status = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
- if (status != 0)
- ErrPrint("Failed to set cancelstate: %s\n", strerror(status));
- return -ETIMEDOUT;
+ free(item);
}
- status = pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
+
+ status = pthread_mutex_unlock(&router->recv_packet_list_lock);
if (status != 0)
- ErrPrint("Failed to set cancelstate: %s\n", strerror(status));
+ ErrPrint("Failed to unlock: %s\n", strerror(errno));
- if (!FD_ISSET(handle, &set)) {
- ErrPrint("Unexpected handle is toggled\n");
- return -EINVAL;
- }
+ /*!
+ * \note
+ * Consuming an event from event pipe
+ * Even if we cannot get the packet(NULL), we should consuming event
+ * Because the NULL packet means disconnected
+ */
+ if (read(router->recv_pipe[PIPE_READ], handle, sizeof(*handle)) != sizeof(*handle))
+ ErrPrint("Failed to get an event: %s\n", strerror(errno));
- return 0;
+ return packet;
}
static inline int build_packet(int handle, struct recv_ctx *ctx)
}
if (ctx->state == RECV_STATE_READY) {
- if (packet_flag(ctx->packet) & PACKET_FLAG_ROUTE)
+ /*!
+ * \NOTE
+ *
+ * If the destination address is ZERO,
+ * Pull up the packet to this server.
+ */
+ if (packet_destination(ctx->packet))
route_packet(router, handle, ctx->packet);
else
- put_packet(router, handle, ctx->packet, ctx->pid);
+ put_recv_packet(router, handle, ctx->packet, ctx->pid);
ctx->state = RECV_STATE_INIT;
}
}
- put_packet(router, handle, NULL, ctx->pid);
+ put_recv_packet(router, handle, NULL, ctx->pid);
return ret;
}
* \NOTE
* Running thread: Server thread
*/
-static void *router_server_main(void *data)
+static void *server_main(void *data)
{
struct client *client = data;
struct router *router = client->router;
* \NOTE
* Running thread: Client thread
*/
-static void *router_client_main(void *data)
+static void *client_main(void *data)
{
struct router *router = data;
struct recv_ctx ctx;
client->router = router;
router->info.server.client_list = dlist_append(router->info.server.client_list, client);
- status = pthread_create(&client->thid, NULL, router_server_main, client);
+ status = pthread_create(&client->thid, NULL, server_main, client);
if (status != 0) {
ErrPrint("Thread creation failed: %s\n", strerror(status));
dlist_remove_data(router->info.server.client_list, client);
gio = g_io_channel_unix_new(router->handle);
if (!gio) {
+ handle = destroy_router(router);
secure_socket_destroy_handle(handle);
- destroy_router(router);
return -EIO;
}
}
g_io_channel_unref(gio);
+ handle = destroy_router(router);
secure_socket_destroy_handle(handle);
- destroy_router(router);
return -EIO;
}
router->timeout = timeout;
router->is_server = 0;
- status = pthread_mutex_init(&router->packet_list_lock, NULL);
+ status = pthread_mutex_init(&router->recv_packet_list_lock, NULL);
if (status != 0) {
ErrPrint("Mutex creation failed: %s\n", strerror(status));
+ handle = destroy_router(router);
secure_socket_destroy_handle(handle);
- destroy_router(router);
return -EFAULT;
}
status = pthread_mutex_init(&router->route_list_lock, NULL);
if (status != 0) {
ErrPrint("Mutex creation failed: %s\n", strerror(status));
+ handle = destroy_router(router);
secure_socket_destroy_handle(handle);
- destroy_router(router);
return -EFAULT;
}
- status = pthread_create(&router->info.client.thid, NULL, router_client_main, router);
+ status = pthread_create(&router->info.client.thid, NULL, client_main, router);
if (status != 0) {
ErrPrint("Thread creation failed: %s\n", strerror(status));
+ handle = destroy_router(router);
secure_socket_destroy_handle(handle);
- destroy_router(router);
return -EFAULT;
}
data = router->data;
- secure_socket_destroy_handle(router->handle);
-
- destroy_router(router);
+ handle = destroy_router(router);
+ secure_socket_destroy_handle(handle);
return data;
}
*/
EAPI int com_core_packet_router_async_send(int handle, struct packet *packet, double timeout, int (*recv_cb)(pid_t pid, int handle, const struct packet *packet, void *data), void *data)
{
- int ret;
struct request_ctx *ctx;
+ struct router *router;
+ int ret;
+
+ if (handle < 0 || !packet)
+ return -EINVAL;
if (packet_type(packet) != PACKET_REQ) {
ErrPrint("Invalid packet - should be PACKET_REQ\n");
return -EINVAL;
}
+ router = find_router_by_handle(handle);
+ if (!router) {
+ ErrPrint("Router is not available\n");
+ return -EINVAL;
+ }
+
ctx = create_request_ctx(handle);
if (!ctx)
return -ENOMEM;
ctx->data = data;
ctx->packet = packet_ref(packet);
- ret = com_core_send(handle, (void *)packet_data(packet), packet_size(packet), timeout);
- if (ret != packet_size(packet)) {
- ErrPrint("Send failed. %d <> %d (handle: %d)\n", ret, packet_size(packet), handle);
+ ret = put_send_packet(router, handle, packet);
+ if (ret < 0)
destroy_request_ctx(ctx);
- return -EIO;
- }
- return 0;
+ return ret;
}
/*!
*/
EAPI int com_core_packet_router_send_only(int handle, struct packet *packet)
{
- return com_core_packet_send_only(handle, packet);
+ struct router *router;
+
+ if (handle < 0 || !packet || packet_type(packet) != PACKET_REQ_NOACK)
+ return -EINVAL;
+
+ router = find_router_by_handle(handle);
+ if (!router) {
+ ErrPrint("Rouer is not available\n");
+ return -EINVAL;
+ }
+
+ return put_send_packet(router, handle, packet);
}
/*!
{
struct router *router;
struct route *route;
+ struct route *tmp;
int status;
+ struct dlist *l;
+ int found = 0;
if (handle < 0 || !address || h < 0)
return -EINVAL;
if (status != 0)
ErrPrint("Failed to lock: %s\n", strerror(status));
- router->route_list = dlist_append(router->route_list, route);
+ dlist_foreach(router->route_list, l, tmp) {
+ if (tmp->address == address) {
+ found = 1;
+ break;
+ }
+ }
+
+ if (!found)
+ router->route_list = dlist_append(router->route_list, route);
status = pthread_mutex_unlock(&router->route_list_lock);
if (status != 0)
ErrPrint("Failed to unlock: %s\n", strerror(status));
+ if (found) {
+ free(route);
+ return -EEXIST;
+ }
+
return 0;
}
struct dlist *l;
struct dlist *n;
int status;
+ int found = 0;
if (handle < 0 || !address)
return -EINVAL;
DbgPrint("Delete an entry from the table (%lu : %d)\n", route->address, route->handle);
free(route);
+
+ found = 1;
break;
}
if (status != 0)
ErrPrint("Failed to unlock: %s\n", strerror(status));
- return 0;
+ return found ? 0 : -ENOENT;
}
+/*!
+ * \NOTE
+ * Running thread: Main
+ */
EAPI int com_core_packet_router_update_route(int handle, unsigned long address, int h)
{
struct router *router;