Add support for asynchronous transfer on network transport 52/177852/11
authorDongwoo Lee <dwoo08.lee@samsung.com>
Thu, 28 Sep 2017 06:29:42 +0000 (15:29 +0900)
committerSeung-Woo Kim <sw0312.kim@samsung.com>
Thu, 31 May 2018 10:32:25 +0000 (19:32 +0900)
To improve performance of network transfer, add asynchronous
transfer with AIO APIs to network transport.

Change-Id: I91dbf1161aeebc03af2bccdbe4ed0e20a322949d
Signed-off-by: Dongwoo Lee <dwoo08.lee@samsung.com>
Signed-off-by: Seung-Woo Kim <sw0312.kim@samsung.com>
CMakeLists.txt
libthor/thor_net.c

index 4d67887ab9eabee4df182d8c90d7d53e0b71ef6b..e4f315cc142d7f664b7c876df7cf138d9881355e 100755 (executable)
@@ -66,7 +66,7 @@ ADD_LIBRARY(libthor ${LIBTHOR_SRCS})
 
 ADD_EXECUTABLE(${PROJECT_NAME} ${SRCS})
 
-TARGET_LINK_LIBRARIES(${PROJECT_NAME} libthor ${pkgs_LDFLAGS})
+TARGET_LINK_LIBRARIES(${PROJECT_NAME} libthor rt ${pkgs_LDFLAGS})
 
 
 INSTALL(TARGETS ${PROJECT_NAME} DESTINATION ${BINDIR})
index 08d479f29c78d9a536a1f977403d63b5b383e3a3..a3e8a8f200f244456ed5cacb195c781303ecfcdf 100644 (file)
@@ -4,6 +4,7 @@
 #include <stdlib.h>
 #include <unistd.h>
 #include <sys/socket.h>
+#include <aio.h>
 
 #include "thor.h"
 #include "thor_internal.h"
@@ -135,26 +136,180 @@ static int thor_net_recv(thor_device_handle *th, unsigned char *buf,
        return 0;
 }
 
-static int t_thor_net_send_chunk(thor_device_handle *th, unsigned char *chunk,
-                                off_t size, int chunk_number)
-{
+struct t_thor_net_chunk {
+       struct net_device_handle *nh;
+       struct aiocb data_transfer;
+       struct aiocb resp_transfer;
+       void *user_data;
+       off_t useful_size;
        struct data_res_pkt resp;
+       unsigned char *buf;
+       off_t trans_unit_size;
+       int chunk_number;
+       int data_finished;
+       int resp_finished;
+};
+
+struct t_thor_net_transfer {
+       struct thor_device_handle *th;
+       struct thor_data_src *data;
+       thor_progress_cb report_progress;
+       void *user_data;
+       off_t data_left;
+       off_t data_sent;
+       off_t data_in_progress;
+       int chunk_number;
+       int completed;
        int ret;
+};
 
-       ret = thor_net_send(th, chunk, size, 0);
-       if (ret < 0)
-               return ret;
+static int t_thor_submit_chunk(struct t_thor_net_chunk *chunk)
+{
+       struct t_thor_net_transfer *transfer = chunk->user_data;
+       int ret;
+
+       chunk->data_finished = chunk->resp_finished = 0;
+
+       ret = aio_write(&chunk->data_transfer);
+       if (ret)
+               goto out;
+
+       memset(&chunk->resp, 0, DATA_RES_PKT_SIZE);
+       ret = aio_read(&chunk->resp_transfer);
+       if (ret)
+               goto cancel_data_transfer;
+
+       return 0;
+cancel_data_transfer:
+       aio_cancel(chunk->nh->sock_fd, &chunk->data_transfer);
+out:
+       return ret;
+}
 
-       memset(&resp, 0, DATA_RES_PKT_SIZE);
+static int t_thor_prep_next_chunk(struct t_thor_net_chunk *chunk,
+                                 struct t_thor_net_transfer *transfer)
+{
+       off_t to_read;
+       int ret;
+
+       to_read = transfer->data_left - transfer->data_in_progress;
+       if (to_read <= 0) {
+               printf("to big data in progress\n");
+               fflush(stdout);
+               return -EINVAL;
+       }
 
-       ret = thor_net_recv(th, (unsigned char *)&resp, DATA_RES_PKT_SIZE, 0);
-       if (ret < 0)
+       chunk->useful_size = to_read > chunk->trans_unit_size ?
+                            chunk->trans_unit_size : to_read;
+
+       ret = transfer->data->get_block(transfer->data,
+                                       chunk->buf, chunk->useful_size);
+       if (ret < 0 || ret != chunk->useful_size)
                return ret;
 
-       if (resp.cnt != chunk_number)
-               return -EIO;
+       memset(chunk->buf + chunk->useful_size, 0,
+              chunk->trans_unit_size - chunk->useful_size);
+       chunk->chunk_number = transfer->chunk_number++;
+
+       ret = t_thor_submit_chunk(chunk);
+       if (!ret)
+               transfer->data_in_progress += chunk->useful_size;
+
+       return ret;
+}
+
+static void check_next_chunk(struct t_thor_net_chunk *chunk,
+                            struct t_thor_net_transfer *transfer)
+{
+       int ret;
 
-       return resp.ack;
+       if (transfer->data_left - transfer->data_in_progress) {
+               ret = t_thor_prep_next_chunk(chunk, transfer);
+               if (ret) {
+                       transfer->ret = ret;
+                       transfer->completed = 1;
+               }
+       } else {
+               if (transfer->data_in_progress == 0)
+                       transfer->completed = 1;
+       }
+}
+
+static void data_finished(sigval_t sigval)
+{
+       struct t_thor_net_chunk *chunk = sigval.sival_ptr;
+       struct t_thor_net_transfer *transfer = chunk->user_data;
+       int ret;
+
+       chunk->data_finished = 1;
+
+       ret = aio_error(&chunk->data_transfer);
+       if (ret == ECANCELED || transfer->ret) {
+               return;
+       } else if (ret < 0) {
+               transfer->ret = ret;
+               transfer->completed = 1;
+               return;
+       }
+
+       ret = aio_return(&chunk->data_transfer);
+       if (ret < chunk->data_transfer.aio_nbytes) {
+               transfer->ret = -EIO;
+               transfer->completed = 1;
+               return;
+       }
+
+       if (chunk->resp_finished)
+               check_next_chunk(chunk, transfer);
+}
+
+static void resp_finished(sigval_t sigval)
+{
+       struct t_thor_net_chunk *chunk = sigval.sival_ptr;
+       struct t_thor_net_transfer *transfer = chunk->user_data;
+       int ret;
+
+       chunk->resp_finished = 1;
+       transfer->data_in_progress -= chunk->useful_size;
+
+       ret = aio_error(&chunk->resp_transfer);
+       if (ret == ECANCELED || transfer->ret) {
+               return;
+       } else if (ret < 0) {
+               transfer->ret = ret;
+               transfer->completed = 1;
+               return;
+       }
+
+       ret = aio_return(&chunk->resp_transfer);
+       if (ret < chunk->resp_transfer.aio_nbytes) {
+               transfer->ret = -EIO;
+               transfer->completed = 1;
+               return;
+       }
+
+       if (chunk->resp.cnt != chunk->chunk_number) {
+               fprintf(stderr, "chunk number mismatch: %d != %d\n",
+                       chunk->resp.cnt, chunk->chunk_number);
+               fflush(stderr);
+               transfer->ret = -EINVAL;
+               transfer->completed = 1;
+               return;
+       }
+
+       transfer->data_sent += chunk->useful_size;
+       transfer->data_left -= chunk->useful_size;
+
+       if (transfer->report_progress)
+               transfer->report_progress(transfer->th,
+                                         transfer->data,
+                                         transfer->data_sent,
+                                         transfer->data_left,
+                                         chunk->chunk_number,
+                                         transfer->user_data);
+
+       if (chunk->data_finished)
+               check_next_chunk(chunk, transfer);
 }
 
 static int thor_net_send_raw_data(thor_device_handle *th,
@@ -164,54 +319,80 @@ static int thor_net_send_raw_data(thor_device_handle *th,
                                  void *user_data)
 {
        struct net_device_handle *nh = th->dev_priv;
-       unsigned char *chunk;
-       off_t data_left;
-       off_t size;
-       off_t data_sent = 0;
-       int chunk_number = 1;
+       struct t_thor_net_transfer transfer;
+       struct t_thor_net_chunk chunk;
        int ret;
 
        if (!nh)
                return -ENODEV;
 
-       chunk = malloc(trans_unit_size);
-       if (!chunk)
+       /* Init transfer */
+       transfer.th = th;
+       transfer.data = data;
+       transfer.report_progress = report_progress;
+       transfer.user_data = user_data;
+       transfer.data_left = data->get_file_length(data);
+       transfer.data_sent = 0;
+       transfer.chunk_number = 1;
+       transfer.completed = 0;
+       transfer.data_in_progress = 0;
+       transfer.ret = 0;
+
+       /* Init chunk */
+       chunk.user_data = &transfer;
+       chunk.useful_size = 0;
+       chunk.trans_unit_size = trans_unit_size;
+       chunk.buf = malloc(trans_unit_size);
+       if (!chunk.buf)
                return -ENOMEM;
 
-       data_left = data->get_file_length(data);
+       chunk.data_transfer.aio_fildes = nh->sock_fd;
+       chunk.data_transfer.aio_buf = chunk.buf;
+       chunk.data_transfer.aio_nbytes = chunk.trans_unit_size;
+       chunk.data_transfer.aio_sigevent.sigev_notify = SIGEV_THREAD;
+       chunk.data_transfer.aio_sigevent.sigev_notify_function = data_finished;
+       chunk.data_transfer.aio_sigevent.sigev_notify_attributes = NULL;
+       chunk.data_transfer.aio_sigevent.sigev_value.sival_ptr = &chunk;
+
+       chunk.resp_transfer.aio_fildes = nh->sock_fd;
+       chunk.resp_transfer.aio_buf = &chunk.resp;
+       chunk.resp_transfer.aio_nbytes = DATA_RES_PKT_SIZE;
+       chunk.resp_transfer.aio_sigevent.sigev_notify = SIGEV_THREAD;
+       chunk.resp_transfer.aio_sigevent.sigev_notify_function = resp_finished;
+       chunk.resp_transfer.aio_sigevent.sigev_notify_attributes = NULL;
+       chunk.resp_transfer.aio_sigevent.sigev_value.sival_ptr = &chunk;
+
+       ret = t_thor_prep_next_chunk(&chunk, &transfer);
+       if (ret)
+               goto cancel_chunks;
 
-       while (data_left) {
-               size = data_left > trans_unit_size ?
-                      trans_unit_size : data_left;
+       while (!transfer.completed) {
+               int err;
 
-               ret = data->get_block(data, chunk, size);
-               if (ret < 0 || ret != size)
-                       goto cleanup;
+               err = aio_error(&chunk.data_transfer);
+               if (err < 0)
+                       break;
 
-               memset(chunk + size, 0, trans_unit_size - size);
-               if (th) {
-                       ret = t_thor_net_send_chunk(th, chunk, trans_unit_size,
-                                                   chunk_number);
-                       if (ret)
-                               goto cleanup;
-               }
+               err = aio_error(&chunk.resp_transfer);
+               if (err < 0)
+                       break;
+       }
 
-               data_sent += size;
-               data_left -= size;
-               ++chunk_number;
-               if (report_progress)
-                       report_progress(NULL, data, data_sent, data_left,
-                                       chunk_number, user_data);
+       if (transfer.data_in_progress) {
+               ret = transfer.ret;
+               goto cancel_chunks;
        }
 
-       ret = 0;
+       free(chunk.buf);
+       return transfer.ret;
 
-cleanup:
-       free(chunk);
+cancel_chunks:
+       aio_cancel(nh->sock_fd, NULL);
+
+       free(chunk.buf);
        return ret;
 }
 
-
 static struct thor_transport_ops thor_net_ops = {
        .open = thor_net_open,
        .close = thor_net_close,