Fix deadlock with TIDL operation 22/321822/11
authorJeongmo Yang <jm80.yang@samsung.com>
Thu, 27 Mar 2025 06:34:26 +0000 (15:34 +0900)
committerJeongmo Yang <jm80.yang@samsung.com>
Thu, 3 Apr 2025 02:15:04 +0000 (11:15 +0900)
- Update TIDL interface
- Add new thread for TIDL delegate callback
- Add new TIDL delegate callback for async return value

[Version] 1.2.0
[Issue Type] Improvement

Change-Id: I8f42ee0865c10cd62add4e5527a97052cf9fa886
Signed-off-by: Jeongmo Yang <jm80.yang@samsung.com>
include/hal_codec_ipc_1.tidl
packaging/hal-api-codec.spec
src/hal-api-codec-ipc.c
src/service_plugin/hal-backend-service-codec.c

index 7e44634a509108ecdb8a2f4bba750f29ab42d19a..755e0a93be33c1cb1f727cb33e2beb7eb0090670 100644 (file)
@@ -1,15 +1,25 @@
 protocol 2
 
 interface codec {
+  enum async_return_type {
+    NONE = 0,
+    DECODE,
+    ENCODE,
+    RELEASE_OUTPUT_BUFFER,
+    FLUSH,
+    STOP
+  }
+
   void message(bundle msg, array<file_desc> fd) delegate;
+  void async_return(async_return_type type, int ret) delegate;
 
-  int init(int type);
+  int init(int type, message msg_cb, async_return ret_cb);
   int deinit();
 
   int configure(int width, int height, int in_format, int out_format, bool is_secure);
   int release();
 
-  int start(message callback);
+  int start();
   int stop();
   int flush();
 
index a1779c476c9cf22ca658c8f2dca8c42a986a48d7..f59f49d2ff24202e7031a84eb9227fd3dc5257fa 100644 (file)
@@ -6,7 +6,7 @@
 ### main package #########
 Name:       %{name}
 Summary:    %{name} interface
-Version:    1.1.3
+Version:    1.2.0
 Release:    0
 Group:      Development/Libraries
 License:    Apache-2.0
index 2c15074c1f9bd410b6de6bc8e1d7e1e2f5caf643..a46e384d4288a7bbb28bcddb3976bd5d7c3a8306 100644 (file)
  * limitations under the License.
  */
 
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+
+#include <unistd.h>
 #include <sys/mman.h>
 #include <stdio.h>
 #include <stdint.h>
-#include <unistd.h>
 #include <dlfcn.h>
 #include <dlog.h>
 #include <glib.h>
 #include "hal-codec-ipc.h"
 #include "hal_codec_proxy_1.h"
 
+
 #ifdef LOG_TAG
 #undef LOG_TAG
 #endif
 #define LOG_TAG "HALAPI_CODEC_IPC"
 
 #define HAL_CODEC_IPC_CONNECT_COUNT_MAX     10
+#define HAL_CODEC_IPC_CONNECT_TRY_SLEEP_US  100000
 
 
 #define HAL_CODEC_RETURN_IF_FAILED(arg) \
                }\
        } while (0)
 
-typedef struct _hal_codec_ipc_s {
-       rpc_port_proxy_codec_h rpc_handle;
-       rpc_port_proxy_codec_message_h rpc_msg_handle;
+#define ASYNC_RETURN_TYPE_MAX       (RPC_PORT_PROXY_ASYNC_RETURN_TYPE_CODEC_STOP + 1)
+
+
+
+typedef struct _hal_codec_ipc_message_s {
+       rpc_port_proxy_codec_message_h handle;
+       hal_codec_message_cb user_cb;
+       void *user_cb_data;
+       GMutex lock;
+} hal_codec_ipc_message_s;
 
-       hal_codec_message_cb msg_cb;
-       void *msg_cb_data;
 
-       GMutex buffer_lock;
-       GMutex msg_lock;
+typedef struct _hal_codec_ipc_async_return_s {
+       rpc_port_proxy_codec_async_return_h handle;
+       int result[ASYNC_RETURN_TYPE_MAX];
+       bool wait_result[ASYNC_RETURN_TYPE_MAX];
+       GMutex lock[ASYNC_RETURN_TYPE_MAX];
+       GCond cond[ASYNC_RETURN_TYPE_MAX];
+} hal_codec_ipc_async_return_s;
 
+
+typedef struct _hal_codec_ipc_context_s {
+       GThread *thread;
+       GMainContext *context;
+       GMainLoop *loop;
+       GMutex lock;
+       GCond cond;
+       gboolean running;
+} hal_codec_ipc_context_s;
+
+
+typedef struct _hal_codec_ipc_buffer_s {
+       uint32_t input_buffer_count;
        hal_codec_buffer_s *input_buffers[HAL_CODEC_BUFFER_MAX];
+       uint32_t output_buffer_count;
        hal_codec_buffer_s *output_buffers[HAL_CODEC_BUFFER_MAX];
+       GMutex lock;
+       GCond cond;
+} hal_codec_ipc_buffer_s;
+
+
+typedef struct _hal_codec_ipc_s {
+       rpc_port_proxy_codec_h rpc_handle;
+       bool is_flushing;
+
+       /* buffer */
+       hal_codec_ipc_buffer_s ipc_buffer;
+
+       /* message callback */
+       hal_codec_ipc_message_s ipc_message;
+
+       /* async return callback */
+       hal_codec_ipc_async_return_s ipc_async_return;
+
+       /* context for msg thread */
+       hal_codec_ipc_context_s ipc_context;
 } hal_codec_ipc_s;
 
 
@@ -132,9 +182,13 @@ static void __hal_codec_ipc_message_cb(void *user_data, bundle *b_msg, rpc_port_
        size_t mmap_length = 0;
        size_t hal_message_size = 0;
        size_t hal_buffer_size = 0;
+
        hal_codec_message_s *hal_message = NULL;
        hal_codec_buffer_s *hal_buffer = NULL;
+
        hal_codec_ipc_s *handle = (hal_codec_ipc_s *)user_data;
+       hal_codec_ipc_buffer_s *ipc_buffer = NULL;
+       hal_codec_ipc_message_s *ipc_message = NULL;
 
        HAL_CODEC_RETURN_IF_FAILED(handle);
        HAL_CODEC_RETURN_IF_FAILED(b_msg);
@@ -149,6 +203,9 @@ static void __hal_codec_ipc_message_cb(void *user_data, bundle *b_msg, rpc_port_
                return;
        }
 
+       ipc_buffer = &handle->ipc_buffer;
+       ipc_message = &handle->ipc_message;
+
        SLOGD("message: type[%d]", hal_message->type);
 
        switch (hal_message->type) {
@@ -156,30 +213,39 @@ static void __hal_codec_ipc_message_cb(void *user_data, bundle *b_msg, rpc_port_
                ipc_ret = bundle_get_byte(b_msg, HAL_CODEC_IPC_PARAM_KEY_BUFFER, (void **)&hal_buffer, &hal_buffer_size);
                if (ipc_ret != BUNDLE_ERROR_NONE ||
                        hal_buffer_size != sizeof(hal_codec_buffer_s)) {
-                       SLOGE("get message from bundle failed[0x%x], size[%zu:%zu]",
+                       SLOGE("[INPUT][USED] get message from bundle failed[0x%x], size[%zu:%zu]",
                                ipc_ret, hal_message_size, sizeof(hal_codec_message_s));
                        goto _CODEC_IPC_MESSAGE_CB_DONE;
                }
 
                if (hal_buffer->index >= HAL_CODEC_BUFFER_MAX) {
-                       SLOGE("[INPUT_USED] invalid buffer index[%d]", hal_buffer->index);
+                       SLOGE("[INPUT][USED] invalid buffer index[%d]", hal_buffer->index);
                        goto _CODEC_IPC_MESSAGE_CB_DONE;
                }
 
-               g_mutex_lock(&handle->buffer_lock);
+               g_mutex_lock(&ipc_buffer->lock);
 
-               if (!handle->input_buffers[hal_buffer->index]) {
-                       SLOGE("[INPUT_USED] NULL buffer for index[%d]", hal_buffer->index);
-                       g_mutex_unlock(&handle->buffer_lock);
+               if (!ipc_buffer->input_buffers[hal_buffer->index]) {
+                       SLOGE("[INPUT][USED] NULL buffer for index[%d]", hal_buffer->index);
+                       g_mutex_unlock(&ipc_buffer->lock);
                        goto _CODEC_IPC_MESSAGE_CB_DONE;
                }
 
-               hal_message->buffer = handle->input_buffers[hal_buffer->index];
-               handle->input_buffers[hal_buffer->index] = NULL;
+               hal_message->buffer = ipc_buffer->input_buffers[hal_buffer->index];
 
-               SLOGD("check index [%d] vs [%d]", hal_message->buffer->index, hal_buffer->index);
+               if (ipc_buffer->input_buffer_count != 0) {
+                       SLOGD("[INPUT][USED] buffer[%d] %p, count[%u -> %u]",
+                               hal_buffer->index, hal_message->buffer,
+                               ipc_buffer->input_buffer_count, ipc_buffer->input_buffer_count - 1);
 
-               g_mutex_unlock(&handle->buffer_lock);
+                       ipc_buffer->input_buffers[hal_buffer->index] = NULL;
+                       ipc_buffer->input_buffer_count--;
+               } else {
+                       SLOGW("[INPUT][USED] buffer[%d] %p, but count is already zero",
+                               hal_buffer->index, hal_message->buffer);
+               }
+
+               g_mutex_unlock(&ipc_buffer->lock);
 
                break;
 
@@ -187,7 +253,7 @@ static void __hal_codec_ipc_message_cb(void *user_data, bundle *b_msg, rpc_port_
                ipc_ret = bundle_get_byte(b_msg, HAL_CODEC_IPC_PARAM_KEY_BUFFER, (void **)&b_data, &hal_buffer_size);
                if (ipc_ret != BUNDLE_ERROR_NONE ||
                        hal_buffer_size != sizeof(hal_codec_buffer_s)) {
-                       SLOGE("get message from bundle failed[0x%x], size[%zu:%zu]",
+                       SLOGE("[OUTPUT] get message from bundle failed[0x%x], size[%zu:%zu]",
                                ipc_ret, hal_message_size, sizeof(hal_codec_message_s));
                        goto _CODEC_IPC_MESSAGE_CB_DONE;
                }
@@ -197,7 +263,7 @@ static void __hal_codec_ipc_message_cb(void *user_data, bundle *b_msg, rpc_port_
                ipc_ret = rpc_port_proxy_array_file_desc_get(fd_handle, &fd, &fd_size);
                if (ipc_ret != RPC_PORT_ERROR_NONE ||
                        hal_buffer->memory.num_fd != (uint32_t)fd_size) {
-                       SLOGE("get fd failed[0x%x], num_fd[%u] vs fd_size[%d]",
+                       SLOGE("[OUTPUT] get fd failed[0x%x], num_fd[%u] vs fd_size[%d]",
                                ipc_ret, hal_buffer->memory.num_fd, fd_size);
                        goto _CODEC_IPC_MESSAGE_CB_DONE;
                }
@@ -216,32 +282,38 @@ static void __hal_codec_ipc_message_cb(void *user_data, bundle *b_msg, rpc_port_
                                MAP_SHARED, fd[i], 0);
                        if (hal_buffer->planes.plane[i].data == MAP_FAILED) {
                                hal_buffer->planes.plane[i].data = NULL;
-                               SLOGE("  mmap failed for fd[%d] %d", i, fd[i]);
+                               SLOGE("[OUTPUT]   mmap failed for fd[%d] %d", i, fd[i]);
                        } else {
-                               SLOGD("plane[%d] data %p", i, hal_buffer->planes.plane[i].data);
+                               SLOGD("[OUTPUT] plane[%d] data %p", i, hal_buffer->planes.plane[i].data);
                        }
                }
 
                if (fd_size != (int)hal_buffer->planes.num_planes) {
                        for (i = 1 ; i < (int)hal_buffer->planes.num_planes ; i++) {
                                hal_buffer->planes.plane[i].data = hal_buffer->planes.plane[i-1].data + hal_buffer->planes.plane[i-1].size;
-                               SLOGD("plane[%d] data %p", i, hal_buffer->planes.plane[i].data);
+                               SLOGD("[OUTPUT] plane[%d] data %p", i, hal_buffer->planes.plane[i].data);
                        }
                }
 
-               g_mutex_lock(&handle->buffer_lock);
+               g_mutex_lock(&ipc_buffer->lock);
 
-               if (handle->output_buffers[hal_buffer->index]) {
-                       SLOGE("duplicated buffer index[%d]", hal_buffer->index);
+               if (ipc_buffer->output_buffers[hal_buffer->index]) {
+                       SLOGE("[OUTPUT] duplicated buffer index[%d]", hal_buffer->index);
                        g_free(hal_buffer);
-                       g_mutex_unlock(&handle->buffer_lock);
+                       g_mutex_unlock(&ipc_buffer->lock);
                        goto _CODEC_IPC_MESSAGE_CB_DONE;
                }
 
                hal_message->buffer = hal_buffer;
-               handle->output_buffers[hal_buffer->index] = hal_buffer;
+               ipc_buffer->output_buffers[hal_buffer->index] = hal_buffer;
+
+               ipc_buffer->output_buffer_count++;
+
+               SLOGD("[OUTPUT] buffer[%d] %p, count[%u -> %u]",
+                       hal_buffer->index, hal_buffer,
+                       ipc_buffer->output_buffer_count - 1, ipc_buffer->output_buffer_count);
 
-               g_mutex_unlock(&handle->buffer_lock);
+               g_mutex_unlock(&ipc_buffer->lock);
 
                /* The fd will be closed in release_output_buffer via hal_buffer. */
                fd = NULL;
@@ -255,16 +327,16 @@ static void __hal_codec_ipc_message_cb(void *user_data, bundle *b_msg, rpc_port_
                break;
        }
 
-       g_mutex_lock(&handle->msg_lock);
+       g_mutex_lock(&ipc_message->lock);
 
-       if (handle->msg_cb) {
+       if (ipc_message->user_cb) {
                SLOGD(">>>>> msg cb: type[%d]", hal_message->type);
-               ((hal_codec_message_cb)handle->msg_cb)(hal_message, handle->msg_cb_data);
+               ((hal_codec_message_cb)ipc_message->user_cb)(hal_message, ipc_message->user_cb_data);
                SLOGD("<<<<< msg cb: type[%d]", hal_message->type);
 
-               g_mutex_unlock(&handle->msg_lock);
+               g_mutex_unlock(&ipc_message->lock);
        } else {
-               g_mutex_unlock(&handle->msg_lock);
+               g_mutex_unlock(&ipc_message->lock);
 
                SLOGW("no msg cb for handle[%p]", handle);
 
@@ -283,70 +355,346 @@ _CODEC_IPC_MESSAGE_CB_DONE:
 }
 
 
-int hal_codec_ipc_init(hal_codec_type_e type, void **codec_handle)
+static void __hal_codec_ipc_async_return_cb(void *user_data,
+       rpc_port_proxy_codec_async_return_type_e type, int ret)
+{
+       hal_codec_ipc_s *handle = (hal_codec_ipc_s *)user_data;
+       hal_codec_ipc_async_return_s *ipc_async_return = NULL;
+
+       HAL_CODEC_RETURN_IF_FAILED(handle);
+
+       if (type > RPC_PORT_PROXY_ASYNC_RETURN_TYPE_CODEC_STOP) {
+               SLOGE("invalid return type[%d], ret[0x%x]", type, ret);
+               return;
+       }
+
+       ipc_async_return = &handle->ipc_async_return;
+
+       SLOGD("type[%d]: ret[0x%x]", type, ret);
+
+       g_mutex_lock(&ipc_async_return->lock[type]);
+
+       ipc_async_return->result[type] = ret;
+       g_cond_broadcast(&ipc_async_return->cond[type]);
+
+       g_mutex_unlock(&ipc_async_return->lock[type]);
+}
+
+
+static int __hal_codec_ipc_cb_handle_new(hal_codec_ipc_s *handle)
+{
+       int i = 0;
+       int ret = HAL_CODEC_ERROR_NONE;
+       int ipc_ret = RPC_PORT_ERROR_NONE;
+
+       hal_codec_ipc_message_s *ipc_message = NULL;
+       hal_codec_ipc_async_return_s *ipc_async_return = NULL;
+
+       rpc_port_proxy_codec_message_h msg_handle = NULL;
+       rpc_port_proxy_codec_async_return_h async_return_handle = NULL;
+
+       HAL_CODEC_RETURN_VAL_IF_FAILED(handle, HAL_CODEC_ERROR_INVALID_PARAMETER);
+
+       /* message callback */
+       ipc_ret = rpc_port_proxy_codec_message_create(&msg_handle);
+       if (ipc_ret != RPC_PORT_ERROR_NONE) {
+               SLOGE("msg handle create failed[0x%x]", ipc_ret);
+               return HAL_CODEC_ERROR_OUT_OF_MEMORY;
+       }
+
+       ipc_ret = rpc_port_proxy_codec_message_set_callback(msg_handle,
+               __hal_codec_ipc_message_cb, handle);
+       if (ipc_ret != RPC_PORT_ERROR_NONE) {
+               SLOGE("set msg callback failed[0x%x]", ipc_ret);
+               ret = HAL_CODEC_ERROR_INTERNAL;
+               goto _CB_HANDLE_NEW_FAILED;
+       }
+
+       rpc_port_proxy_codec_message_set_once(msg_handle, false);
+
+       /* async return callback */
+       ipc_ret = rpc_port_proxy_codec_async_return_create(&async_return_handle);
+       if (ipc_ret != RPC_PORT_ERROR_NONE) {
+               SLOGE("async return handle create failed[0x%x]", ipc_ret);
+               ret = HAL_CODEC_ERROR_OUT_OF_MEMORY;
+               goto _CB_HANDLE_NEW_FAILED;
+       }
+
+       ipc_ret = rpc_port_proxy_codec_async_return_set_callback(async_return_handle,
+               __hal_codec_ipc_async_return_cb, handle);
+       if (ipc_ret != RPC_PORT_ERROR_NONE) {
+               SLOGE("set async return callback failed[0x%x]", ipc_ret);
+               ret = HAL_CODEC_ERROR_INTERNAL;
+               goto _CB_HANDLE_NEW_FAILED;
+       }
+
+       rpc_port_proxy_codec_async_return_set_once(async_return_handle, false);
+
+       ipc_message = &handle->ipc_message;
+       ipc_async_return = &handle->ipc_async_return;
+
+       g_mutex_init(&ipc_message->lock);
+
+       for (i = 0 ; i < ASYNC_RETURN_TYPE_MAX ; i++) {
+               g_mutex_init(&ipc_async_return->lock[i]);
+               g_cond_init(&ipc_async_return->cond[i]);
+       }
+
+       SLOGI("new cb handle: msg[%p], async_return[%p]", msg_handle, async_return_handle);
+
+       ipc_message->handle = msg_handle;
+       ipc_async_return->handle = async_return_handle;
+
+       return HAL_CODEC_ERROR_NONE;
+
+_CB_HANDLE_NEW_FAILED:
+       if (msg_handle)
+               rpc_port_proxy_codec_message_destroy(msg_handle);
+
+       if (async_return_handle)
+               rpc_port_proxy_codec_async_return_destroy(async_return_handle);
+
+       return ret;
+}
+
+
+static void __hal_codec_ipc_cb_handle_release(hal_codec_ipc_s *handle)
+{
+       int i = 0;
+       int ipc_ret = 0;
+
+       hal_codec_ipc_message_s *ipc_message = NULL;
+       hal_codec_ipc_async_return_s *ipc_async_return = NULL;
+
+       HAL_CODEC_RETURN_IF_FAILED(handle);
+
+       ipc_message = &handle->ipc_message;
+       ipc_async_return = &handle->ipc_async_return;
+
+       if (ipc_async_return->handle) {
+               SLOGI("async return handle[%p]", ipc_async_return->handle);
+
+               ipc_ret = rpc_port_proxy_codec_async_return_dispose(handle->rpc_handle, ipc_async_return->handle);
+               if (ipc_ret != RPC_PORT_ERROR_NONE) {
+                       SLOGE("async return handle dispose failed[0x%x], try to destroy", ipc_ret);
+                       rpc_port_proxy_codec_async_return_destroy(ipc_async_return->handle);
+               }
+
+               ipc_async_return->handle = NULL;
+       }
+
+       if (ipc_message->handle) {
+               SLOGI("msg handle[%p]", ipc_message->handle);
+
+               ipc_ret = rpc_port_proxy_codec_message_dispose(handle->rpc_handle, ipc_message->handle);
+               if (ipc_ret != RPC_PORT_ERROR_NONE) {
+                       SLOGE("msg handle dispose failed[0x%x], try to destroy", ipc_ret);
+                       rpc_port_proxy_codec_message_destroy(ipc_message->handle);
+               }
+
+               ipc_message->handle = NULL;
+       }
+
+       g_mutex_clear(&ipc_message->lock);
+
+       for (i = 0 ; i < ASYNC_RETURN_TYPE_MAX ; i++) {
+               g_mutex_clear(&ipc_async_return->lock[i]);
+               g_cond_clear(&ipc_async_return->cond[i]);
+       }
+}
+
+
+static gpointer __hal_codec_ipc_context_func(gpointer data)
 {
        int ret = 0;
+       int ipc_ret = 0;
        int connect_count = 0;
-       hal_codec_ipc_s *new_handle = NULL;
+       const char *stub_proc_name = NULL;
+
+       hal_codec_ipc_s *handle = (hal_codec_ipc_s *)data;
+       hal_codec_ipc_context_s *ipc_context = NULL;
+
        rpc_port_proxy_codec_callback_s rpc_callbacks = {
                .connected = __hal_codec_ipc_connected,
                .disconnected = __hal_codec_ipc_disconnected,
                .rejected = __hal_codec_ipc_rejected
        };
-       const char *stub_proc_name = NULL;
 
-       HAL_CODEC_RETURN_VAL_IF_FAILED(codec_handle, HAL_CODEC_ERROR_INVALID_PARAMETER);
+       HAL_CODEC_RETURN_VAL_IF_FAILED(handle, NULL);
 
-       SLOGI("start");
+       SLOGI("start: handle[%p]", handle);
+
+       ipc_context = &handle->ipc_context;
+
+       ipc_context->context = g_main_context_new();
+       ipc_context->loop = g_main_loop_new(ipc_context->context, FALSE);
+
+       g_main_context_push_thread_default(ipc_context->context);
 
        ret = hal_common_get_stub_proc_name(HAL_MODULE_CODEC, &stub_proc_name);
        if (ret != 0) {
                SLOGE("Failed to get stub proc name for codec: ret(%d)", ret);
-               return ret;
+               goto _IPC_CONTEXT_FUNC_EXIT;
        }
 
-       new_handle = g_new0(hal_codec_ipc_s, 1);
+       SLOGI("start - stub proc name[%s]", stub_proc_name);
 
-       ret = rpc_port_proxy_codec_create(stub_proc_name, &rpc_callbacks, NULL,
-                       &new_handle->rpc_handle);
-       if (ret != RPC_PORT_ERROR_NONE) {
-               SLOGE("RPC handle failed[0x%x]", ret);
-               g_free(new_handle);
-               return HAL_CODEC_ERROR_INTERNAL;
+       ipc_ret = rpc_port_proxy_codec_create(stub_proc_name, &rpc_callbacks, NULL,
+                       &handle->rpc_handle);
+       if (ipc_ret != RPC_PORT_ERROR_NONE) {
+               SLOGE("RPC handle failed[0x%x]", ipc_ret);
+               goto _IPC_CONTEXT_FUNC_EXIT;
        }
 
        while (connect_count++ < HAL_CODEC_IPC_CONNECT_COUNT_MAX) {
-               ret = rpc_port_proxy_codec_connect_sync(new_handle->rpc_handle);
-               if (ret == RPC_PORT_ERROR_NONE)
+               ipc_ret = rpc_port_proxy_codec_connect_sync(handle->rpc_handle);
+               if (ipc_ret == RPC_PORT_ERROR_NONE)
                        break;
 
-               SLOGW("RPC connect failed: ret[0x%x], retry(%d)...", ret, connect_count);
+               SLOGW("RPC connect failed: ret[0x%x], retry(%d)...", ipc_ret, connect_count);
 
-               g_usleep(100000);
+               usleep(HAL_CODEC_IPC_CONNECT_TRY_SLEEP_US);
        }
 
-       if (ret != RPC_PORT_ERROR_NONE) {
-               rpc_port_proxy_codec_destroy(new_handle->rpc_handle);
-               g_free(new_handle);
-               return HAL_CODEC_ERROR_INTERNAL;
+       if (ipc_ret != RPC_PORT_ERROR_NONE)
+               goto _IPC_CONTEXT_FUNC_EXIT;
+
+       ret = __hal_codec_ipc_cb_handle_new(handle);
+       if (ret != HAL_CODEC_ERROR_NONE)
+               goto _IPC_CONTEXT_FUNC_EXIT;
+
+       g_mutex_lock(&ipc_context->lock);
+
+       SLOGI("set running: TRUE");
+
+       ipc_context->running = TRUE;
+       g_cond_broadcast(&ipc_context->cond);
+
+       g_mutex_unlock(&ipc_context->lock);
+
+       g_main_loop_run(ipc_context->loop);
+
+_IPC_CONTEXT_FUNC_EXIT:
+       SLOGI("going to release");
+
+       __hal_codec_ipc_cb_handle_release(handle);
+
+       if (handle->rpc_handle) {
+               rpc_port_proxy_codec_disconnect(handle->rpc_handle);
+               rpc_port_proxy_codec_destroy(handle->rpc_handle);
+               handle->rpc_handle = NULL;
        }
 
-       ret = rpc_port_proxy_codec_invoke_init(new_handle->rpc_handle, type);
-       if (ret != RPC_PORT_ERROR_NONE) {
+       g_main_context_pop_thread_default(ipc_context->context);
+
+       g_main_loop_unref(ipc_context->loop);
+       ipc_context->loop = NULL;
+       g_main_context_unref(ipc_context->context);
+       ipc_context->context = NULL;
+
+       return NULL;
+}
+
+
+static int __hal_codec_ipc_context_new(hal_codec_ipc_s *handle)
+{
+       hal_codec_ipc_context_s *ipc_context = NULL;
+
+       HAL_CODEC_RETURN_VAL_IF_FAILED(handle, HAL_CODEC_ERROR_INVALID_PARAMETER);
+
+       ipc_context = &handle->ipc_context;
+
+       g_mutex_init(&ipc_context->lock);
+       g_cond_init(&ipc_context->cond);
+
+       g_mutex_lock(&ipc_context->lock);
+
+       ipc_context->running = FALSE;
+       ipc_context->thread = g_thread_new("HAL:Codec:MSG",
+               __hal_codec_ipc_context_func, (gpointer)handle);
+
+       SLOGI("new msg thread[%p]", ipc_context->thread);
+
+       while (!ipc_context->running) {
+               SLOGI("wait for msg thread running");
+               g_cond_wait(&ipc_context->cond, &ipc_context->lock);
+       }
+
+       g_mutex_unlock(&ipc_context->lock);
+
+       return HAL_CODEC_ERROR_NONE;
+}
+
+
+static int __hal_codec_ipc_context_release(hal_codec_ipc_s *handle)
+{
+       hal_codec_ipc_context_s *ipc_context = NULL;
+
+       HAL_CODEC_RETURN_VAL_IF_FAILED(handle, HAL_CODEC_ERROR_INVALID_PARAMETER);
+
+       ipc_context = &handle->ipc_context;
+
+       HAL_CODEC_RETURN_VAL_IF_FAILED(ipc_context->thread, HAL_CODEC_ERROR_INTERNAL);
+       HAL_CODEC_RETURN_VAL_IF_FAILED(ipc_context->loop, HAL_CODEC_ERROR_INTERNAL);
+
+       g_main_loop_quit(ipc_context->loop);
+
+       SLOGI("join msg thread[%p]", ipc_context->thread);
+
+       g_thread_join(ipc_context->thread);
+       ipc_context->thread = NULL;
+
+       g_mutex_clear(&ipc_context->lock);
+       g_cond_clear(&ipc_context->cond);
+
+       SLOGI("done");
+
+       return HAL_CODEC_ERROR_NONE;
+}
+
+
+int hal_codec_ipc_init(hal_codec_type_e type, void **codec_handle)
+{
+       int ret = HAL_CODEC_ERROR_NONE;
+       int ipc_ret = 0;
+       hal_codec_ipc_s *new_handle = NULL;
+
+       HAL_CODEC_RETURN_VAL_IF_FAILED(codec_handle, HAL_CODEC_ERROR_INVALID_PARAMETER);
+
+       new_handle = g_new0(hal_codec_ipc_s, 1);
+
+       ret = __hal_codec_ipc_context_new(new_handle);
+       if (ret != HAL_CODEC_ERROR_NONE)
+               goto _INIT_FAILED;
+
+       ipc_ret = rpc_port_proxy_codec_invoke_init(new_handle->rpc_handle, type,
+               new_handle->ipc_message.handle, new_handle->ipc_async_return.handle);
+       if (ipc_ret != RPC_PORT_ERROR_NONE) {
                SLOGE("invoke init failed[0x%x]", ret);
-               rpc_port_proxy_codec_destroy(new_handle->rpc_handle);
-               g_free(new_handle);
-               return HAL_CODEC_ERROR_INTERNAL;
+               ret = HAL_CODEC_ERROR_INTERNAL;
+               goto _INIT_FAILED;
        }
 
-       g_mutex_init(&new_handle->buffer_lock);
-       g_mutex_init(&new_handle->msg_lock);
+       g_mutex_init(&new_handle->ipc_buffer.lock);
+       g_cond_init(&new_handle->ipc_buffer.cond);
 
        *codec_handle = (void *)new_handle;
 
        SLOGI("new handle[%p:,rpc:%p]", new_handle, new_handle->rpc_handle);
 
        return HAL_CODEC_ERROR_NONE;
+
+_INIT_FAILED:
+       if (new_handle->rpc_handle) {
+               __hal_codec_ipc_cb_handle_release(new_handle);
+               rpc_port_proxy_codec_destroy(new_handle->rpc_handle);
+       }
+
+       __hal_codec_ipc_context_release(new_handle);
+
+       g_free(new_handle);
+
+       return ret;
 }
 
 
@@ -360,33 +708,15 @@ int hal_codec_ipc_deinit(void *codec_handle)
 
        SLOGI("start");
 
+       g_mutex_clear(&handle->ipc_buffer.lock);
+       g_cond_clear(&handle->ipc_buffer.cond);
+
        ret = rpc_port_proxy_codec_invoke_deinit(handle->rpc_handle);
        if (ret != RPC_PORT_ERROR_NONE) {
                SLOGE("invoke deinit failed[0x%x]", ret);
                return HAL_CODEC_ERROR_INTERNAL;
        }
-
-       if (handle->rpc_msg_handle) {
-               ret = rpc_port_proxy_codec_message_dispose(handle->rpc_handle, handle->rpc_msg_handle);
-               if (ret != RPC_PORT_ERROR_NONE)
-                       rpc_port_proxy_codec_message_destroy(handle->rpc_msg_handle);
-               handle->rpc_msg_handle = NULL;
-       }
-
-       ret = rpc_port_proxy_codec_disconnect(handle->rpc_handle);
-       if (ret != RPC_PORT_ERROR_NONE) {
-               SLOGE("disconnect failed: handle[%p], ret[%d]", handle->rpc_handle, ret);
-               return HAL_CODEC_ERROR_INTERNAL;
-       }
-
-       ret = rpc_port_proxy_codec_destroy(handle->rpc_handle);
-       if (ret != RPC_PORT_ERROR_NONE) {
-               SLOGE("destroy failed: handle[%p], ret[%d]", handle->rpc_handle, ret);
-               return HAL_CODEC_ERROR_INTERNAL;
-       }
-
-       g_mutex_clear(&handle->buffer_lock);
-       g_mutex_clear(&handle->msg_lock);
+       __hal_codec_ipc_context_release(handle);
 
        SLOGI("release handle[%p]", handle);
 
@@ -421,42 +751,35 @@ int hal_codec_ipc_release(void *codec_handle)
 int hal_codec_ipc_start(void *codec_handle, hal_codec_message_cb callback, void *user_data)
 {
        int ret = HAL_CODEC_ERROR_NONE;
-       int ipc_ret = 0;
        hal_codec_ipc_s *handle = (hal_codec_ipc_s *)codec_handle;
-       rpc_port_proxy_codec_message_h msg_handle = NULL;
+       hal_codec_ipc_buffer_s *ipc_buffer = NULL;
+       hal_codec_ipc_message_s *ipc_message = NULL;
 
        HAL_CODEC_RETURN_VAL_IF_FAILED(handle, HAL_CODEC_ERROR_INVALID_PARAMETER);
        HAL_CODEC_RETURN_VAL_IF_FAILED(handle->rpc_handle, HAL_CODEC_ERROR_INVALID_PARAMETER);
 
-       ipc_ret = rpc_port_proxy_codec_message_create(&msg_handle);
-       if (ipc_ret != RPC_PORT_ERROR_NONE) {
-               SLOGE("msg handle create failed[0x%x]", ipc_ret);
-               return HAL_CODEC_ERROR_OUT_OF_MEMORY;
-       }
+       ipc_buffer = &handle->ipc_buffer;
+       ipc_message = &handle->ipc_message;     
 
-       ipc_ret = rpc_port_proxy_codec_message_set_callback(msg_handle,
-               __hal_codec_ipc_message_cb, handle);
-       if (ipc_ret != RPC_PORT_ERROR_NONE) {
-               SLOGE("set callback failed[0x%x]", ipc_ret);
-               rpc_port_proxy_codec_message_destroy(msg_handle);
-               return HAL_CODEC_ERROR_INTERNAL;
-       }
+       handle->is_flushing = false;
 
-       rpc_port_proxy_codec_message_set_once(msg_handle, false);
+       g_mutex_lock(&ipc_buffer->lock);
 
-       g_mutex_lock(&handle->msg_lock);
+       ipc_buffer->input_buffer_count = 0;
+       ipc_buffer->output_buffer_count = 0;
 
-       ret = rpc_port_proxy_codec_invoke_start(handle->rpc_handle, msg_handle);
-       if (ret == HAL_CODEC_ERROR_NONE) {
-               handle->msg_cb = callback;
-               handle->msg_cb_data = user_data;
-               handle->rpc_msg_handle = msg_handle;
-       } else {
-               SLOGE("start failed[0x%x]", ret);
-               rpc_port_proxy_codec_message_destroy(msg_handle);
-       }
+       g_mutex_unlock(&ipc_buffer->lock);
+
+       g_mutex_lock(&ipc_message->lock);
 
-       g_mutex_unlock(&handle->msg_lock);
+       ipc_message->user_cb = callback;
+       ipc_message->user_cb_data = user_data;
+
+       g_mutex_unlock(&ipc_message->lock);
+
+       ret = rpc_port_proxy_codec_invoke_start(handle->rpc_handle);
+       if (ret != HAL_CODEC_ERROR_NONE)
+               SLOGE("start failed[0x%x]", ret);
 
        return ret;
 }
@@ -465,46 +788,62 @@ int hal_codec_ipc_start(void *codec_handle, hal_codec_message_cb callback, void
 int hal_codec_ipc_stop(void *codec_handle)
 {
        int ret = HAL_CODEC_ERROR_NONE;
-       int ipc_ret = 0;
+       gint64 end_time = 0;
+       g_autoptr(GMutexLocker) locker = NULL;
+
        hal_codec_ipc_s *handle = (hal_codec_ipc_s *)codec_handle;
+       hal_codec_ipc_async_return_s *ipc_async_return = NULL;
+
+       rpc_port_proxy_codec_async_return_type_e type = RPC_PORT_PROXY_ASYNC_RETURN_TYPE_CODEC_STOP;
 
        HAL_CODEC_RETURN_VAL_IF_FAILED(handle, HAL_CODEC_ERROR_INVALID_PARAMETER);
        HAL_CODEC_RETURN_VAL_IF_FAILED(handle->rpc_handle, HAL_CODEC_ERROR_INVALID_PARAMETER);
 
-       ret = rpc_port_proxy_codec_invoke_stop(handle->rpc_handle);
+       ipc_async_return = &handle->ipc_async_return;
 
-       g_mutex_lock(&handle->msg_lock);
+       SLOGI("handle[%p]", handle);
 
-       if (ret == HAL_CODEC_ERROR_NONE) {
-               handle->msg_cb = NULL;
-               handle->msg_cb_data = NULL;
+       locker = g_mutex_locker_new(&ipc_async_return->lock[type]);
 
-               SLOGI("dispose msg handle[%p]", handle->rpc_msg_handle);
+       ret = rpc_port_proxy_codec_invoke_stop(handle->rpc_handle);
+       if (ret != HAL_CODEC_ERROR_NONE) {
+               SLOGE("invoke stop failed[0x%x]", ret);
+               return ret;
+       }
 
-               if (handle->rpc_msg_handle) {
-                       ipc_ret = rpc_port_proxy_codec_message_dispose(handle->rpc_handle, handle->rpc_msg_handle);
-                       if (ipc_ret != RPC_PORT_ERROR_NONE) {
-                               SLOGW("msg handle dispose failed[0x%x]", ipc_ret);
-                               rpc_port_proxy_codec_message_destroy(handle->rpc_msg_handle);
-                       }
+       SLOGI("wait for async return: stop");
 
-                       handle->rpc_msg_handle = NULL;
-               }
+       end_time = g_get_monotonic_time() + 3 * G_TIME_SPAN_SECOND;
+       if (!g_cond_wait_until(&ipc_async_return->cond[type], &ipc_async_return->lock[type], end_time)) {
+               SLOGE("timeout");
+               return HAL_CODEC_ERROR_INTERNAL;
        }
 
-       g_mutex_unlock(&handle->msg_lock);
-
-       return ret;
+       return ipc_async_return->result[type];
 }
 
 
 int hal_codec_ipc_flush(void *codec_handle)
 {
        hal_codec_ipc_s *handle = (hal_codec_ipc_s *)codec_handle;
+       hal_codec_ipc_async_return_s *ipc_async_return = NULL;
 
        HAL_CODEC_RETURN_VAL_IF_FAILED(handle, HAL_CODEC_ERROR_INVALID_PARAMETER);
        HAL_CODEC_RETURN_VAL_IF_FAILED(handle->rpc_handle, HAL_CODEC_ERROR_INVALID_PARAMETER);
 
+       ipc_async_return = &handle->ipc_async_return;
+
+       g_mutex_lock(&ipc_async_return->lock[RPC_PORT_PROXY_ASYNC_RETURN_TYPE_CODEC_DECODE]);
+
+       handle->is_flushing = true;
+
+       if (ipc_async_return->wait_result[RPC_PORT_PROXY_ASYNC_RETURN_TYPE_CODEC_DECODE]) {
+               SLOGI("send signal to wake up decode thread");
+               g_cond_broadcast(&ipc_async_return->cond[RPC_PORT_PROXY_ASYNC_RETURN_TYPE_CODEC_DECODE]);
+       }
+
+       g_mutex_unlock(&ipc_async_return->lock[RPC_PORT_PROXY_ASYNC_RETURN_TYPE_CODEC_DECODE]);
+
        return rpc_port_proxy_codec_invoke_flush(handle->rpc_handle);
 }
 
@@ -513,9 +852,17 @@ int hal_codec_ipc_decode(void *codec_handle, hal_codec_buffer_s *buffer)
 {
        int ret = HAL_CODEC_ERROR_NONE;
        int ipc_ret = 0;
+       gint64 end_time = 0;
+
        hal_codec_ipc_s *handle = (hal_codec_ipc_s *)codec_handle;
+       hal_codec_ipc_buffer_s *ipc_buffer = NULL;
+       hal_codec_ipc_message_s *ipc_message = NULL;
+       hal_codec_ipc_async_return_s *ipc_async_return = NULL;
+
+       hal_codec_message_s hal_message = {0, };
+
+       rpc_port_proxy_codec_async_return_type_e type = RPC_PORT_PROXY_ASYNC_RETURN_TYPE_CODEC_DECODE;
        bundle *b_buffer = NULL;
-       g_autoptr(GMutexLocker) locker = NULL;
 
        HAL_CODEC_RETURN_VAL_IF_FAILED(handle, HAL_CODEC_ERROR_INVALID_PARAMETER);
        HAL_CODEC_RETURN_VAL_IF_FAILED(handle->rpc_handle, HAL_CODEC_ERROR_INVALID_PARAMETER);
@@ -526,17 +873,22 @@ int hal_codec_ipc_decode(void *codec_handle, hal_codec_buffer_s *buffer)
                return HAL_CODEC_ERROR_INVALID_PARAMETER;
        }
 
-       locker = g_mutex_locker_new(&handle->buffer_lock);
+       ipc_buffer = &handle->ipc_buffer;
+       ipc_async_return = &handle->ipc_async_return;
 
-       if (handle->input_buffers[buffer->index]) {
+       g_mutex_lock(&ipc_buffer->lock);
+
+       if (ipc_buffer->input_buffers[buffer->index]) {
                SLOGE("already existed input buffer[%d] %p",
-                       buffer->index, handle->input_buffers[buffer->index]);
+                       buffer->index, ipc_buffer->input_buffers[buffer->index]);
+               g_mutex_unlock(&ipc_buffer->lock);
                return HAL_CODEC_ERROR_INVALID_PARAMETER;
        }
 
        b_buffer = bundle_create();
        if (!b_buffer) {
                SLOGE("bundle failed");
+               g_mutex_unlock(&ipc_buffer->lock);
                return HAL_CODEC_ERROR_OUT_OF_MEMORY;
        }
 
@@ -556,16 +908,69 @@ int hal_codec_ipc_decode(void *codec_handle, hal_codec_buffer_s *buffer)
                goto _IPC_DECODE_DONE;
        }
 
-       SLOGD("buffer[%d] size[%u]", buffer->index, buffer->size);
+       SLOGD("[INPUT] buffer[%d] %p, count[%u -> %u]",
+               buffer->index, buffer,
+               ipc_buffer->input_buffer_count, ipc_buffer->input_buffer_count + 1);
 
-       ret = rpc_port_proxy_codec_invoke_decode(handle->rpc_handle, b_buffer);
+       ipc_buffer->input_buffers[buffer->index] = buffer;
+       ipc_buffer->input_buffer_count++;
+
+       g_mutex_unlock(&ipc_buffer->lock);
+
+       g_mutex_lock(&ipc_async_return->lock[type]);
+
+       if (handle->is_flushing) {
+               SLOGW("now flushing");
+               bundle_free(b_buffer);
+
+               ipc_message = &handle->ipc_message;
+
+               g_mutex_lock(&ipc_message->lock);
+
+               if (ipc_message->user_cb) {
+                       hal_message.type = HAL_CODEC_MESSAGE_TYPE_INPUT_BUFFER_USED;
+                       hal_message.buffer = buffer;
+                       SLOGD("return input buffer[%d] %p", buffer->index, buffer);
+                       ((hal_codec_message_cb)ipc_message->user_cb)(&hal_message, ipc_message->user_cb_data);
+               }
+
+               g_mutex_unlock(&ipc_message->lock);
+               g_mutex_unlock(&ipc_async_return->lock[type]);
 
+               return HAL_CODEC_ERROR_NONE;
+       }
+
+       ret = rpc_port_proxy_codec_invoke_decode(handle->rpc_handle, b_buffer);
        if (ret == HAL_CODEC_ERROR_NONE) {
-               SLOGD("set input buffer[%d] %p", buffer->index, buffer);
-               handle->input_buffers[buffer->index] = buffer;
+               ipc_async_return->wait_result[type] = true;
+
+               while (true) {
+                       end_time = g_get_monotonic_time() + 3 * G_TIME_SPAN_SECOND;
+                       if (g_cond_wait_until(&ipc_async_return->cond[type], &ipc_async_return->lock[type], end_time)) {
+                               SLOGD("signal received[0x%x][flushing:%d]", ret, handle->is_flushing);
+                               ret = ipc_async_return->result[type];
+                               break;
+                       }
+                       SLOGI("timeout, wait again...");
+               }
+
+               ipc_async_return->wait_result[type] = false;
        }
 
+       g_mutex_unlock(&ipc_async_return->lock[type]);
+
+       g_mutex_lock(&ipc_buffer->lock);
+
 _IPC_DECODE_DONE:
+       if (ret != HAL_CODEC_ERROR_NONE &&
+               ipc_buffer->input_buffers[buffer->index]) {
+               SLOGE("[INPUT] decode failed: reset input buffer[%d] %p", buffer->index, buffer);
+               ipc_buffer->input_buffers[buffer->index] = NULL;
+               ipc_buffer->input_buffer_count--;
+       }
+
+       g_mutex_unlock(&ipc_buffer->lock);
+
        bundle_free(b_buffer);
 
        return ret;
@@ -626,27 +1031,43 @@ _IPC_ENCODE_DONE:
 int hal_codec_ipc_release_output_buffer(void *codec_handle, int buffer_index)
 {
        int ret = HAL_CODEC_ERROR_NONE;
+
        hal_codec_ipc_s *handle = (hal_codec_ipc_s *)codec_handle;
+       hal_codec_buffer_s *hal_buffer = NULL;
+       hal_codec_ipc_buffer_s *ipc_buffer = NULL;
+
+       g_autoptr(GMutexLocker) locker = NULL;
 
        HAL_CODEC_RETURN_VAL_IF_FAILED(handle, HAL_CODEC_ERROR_INVALID_PARAMETER);
        HAL_CODEC_RETURN_VAL_IF_FAILED(handle->rpc_handle, HAL_CODEC_ERROR_INVALID_PARAMETER);
 
+       ipc_buffer = &handle->ipc_buffer;
+
        ret = rpc_port_proxy_codec_invoke_release_output_buffer(handle->rpc_handle, buffer_index);
+       if (ret != HAL_CODEC_ERROR_NONE) {
+               SLOGE("invoke release output buffer failed[0x%x]", ret);
+               return ret;
+       }
 
-       g_mutex_lock(&handle->buffer_lock);
+       locker = g_mutex_locker_new(&ipc_buffer->lock);
 
-       if (ret == HAL_CODEC_ERROR_NONE) {
-               if (handle->output_buffers[buffer_index]) {
-                       __hal_codec_ipc_hal_buffer_release(handle->output_buffers[buffer_index]);
-                       handle->output_buffers[buffer_index] = NULL;
-               } else {
-                       SLOGW("[OUTPUT] no buffer for index[%d]", buffer_index);
-               }
+       hal_buffer = ipc_buffer->output_buffers[buffer_index];
+       ipc_buffer->output_buffers[buffer_index] = NULL;
+
+       if (!hal_buffer) {
+               SLOGW("NULL buffer for index[%d]", buffer_index);
+               return HAL_CODEC_ERROR_NONE;
        }
 
-       g_mutex_unlock(&handle->buffer_lock);
+       SLOGD("[OUTPUT][RELEASE] buffer[%d] %p, count[%u -> %u]",
+               hal_buffer->index, hal_buffer,
+               ipc_buffer->output_buffer_count, ipc_buffer->output_buffer_count - 1);
 
-       return ret;
+       ipc_buffer->output_buffer_count--;
+
+       __hal_codec_ipc_hal_buffer_release(hal_buffer);
+
+       return HAL_CODEC_ERROR_NONE;
 }
 
 
index 1f9a0399a16c8da5d2b536fb384e5c8bd25548fd..e4c61b0d686b7e29a69a46bef34df3fe7d4816fe 100644 (file)
 #endif
 #define LOG_TAG "HAL_BACKEND_SERVICE_CODEC"
 
-#define HAL_CODEC_SERVICE_MSG_LOGE(format, ...) SLOGE(format, ##__VA_ARGS__)
-#define HAL_CODEC_SERVICE_MSG_LOGW(format, ...) SLOGW(format, ##__VA_ARGS__)
-#define HAL_CODEC_SERVICE_MSG_LOGI(format, ...) SLOGI(format, ##__VA_ARGS__)
-#define HAL_CODEC_SERVICE_MSG_LOGD(format, ...) SLOGD(format, ##__VA_ARGS__)
+#define HAL_CODEC_SERVICE_LOGE(format, ...) SLOGE(format, ##__VA_ARGS__)
+#define HAL_CODEC_SERVICE_LOGW(format, ...) SLOGW(format, ##__VA_ARGS__)
+#define HAL_CODEC_SERVICE_LOGI(format, ...) SLOGI(format, ##__VA_ARGS__)
+#define HAL_CODEC_SERVICE_LOGD(format, ...) SLOGD(format, ##__VA_ARGS__)
 
-#define HAL_CODEC_MSG_RETURN_VAL_IF_FAILED(arg, ret) \
+#define HAL_CODEC_SERVICE_RETURN_IF_FAILED(arg) \
        do {\
                if (!(arg)) {\
-                       HAL_CODEC_SERVICE_MSG_LOGE("[%s]failed, return[%s]", #arg, #ret);\
+                       HAL_CODEC_SERVICE_LOGE("[%s]failed", #arg);\
+                       return;\
+               }\
+       } while (0)
+
+#define HAL_CODEC_SERVICE_RETURN_VAL_IF_FAILED(arg, ret) \
+       do {\
+               if (!(arg)) {\
+                       HAL_CODEC_SERVICE_LOGE("[%s]failed, return[%s]", #arg, #ret);\
                        return (ret);\
                }\
        } while (0)
 
-#define HAL_CODEC_SERVICE_LOGE(format, ...) SLOGE("context[%p] "format, context, ##__VA_ARGS__)
-#define HAL_CODEC_SERVICE_LOGW(format, ...) SLOGW("context[%p] "format, context, ##__VA_ARGS__)
-#define HAL_CODEC_SERVICE_LOGI(format, ...) SLOGI("context[%p] "format, context, ##__VA_ARGS__)
-#define HAL_CODEC_SERVICE_LOGD(format, ...) SLOGD("context[%p] "format, context, ##__VA_ARGS__)
+#define HAL_CODEC_SERVICE_LOGE_CONTEXT(format, ...) SLOGE("context[%p] "format, context, ##__VA_ARGS__)
+#define HAL_CODEC_SERVICE_LOGW_CONTEXT(format, ...) SLOGW("context[%p] "format, context, ##__VA_ARGS__)
+#define HAL_CODEC_SERVICE_LOGI_CONTEXT(format, ...) SLOGI("context[%p] "format, context, ##__VA_ARGS__)
+#define HAL_CODEC_SERVICE_LOGD_CONTEXT(format, ...) SLOGD("context[%p] "format, context, ##__VA_ARGS__)
 
-#define HAL_CODEC_RETURN_IF_FAILED(arg) \
+#define HAL_CODEC_SERVICE_RETURN_IF_FAILED_CONTEXT(arg) \
        do {\
                if (!(arg)) {\
-                       HAL_CODEC_SERVICE_LOGE("[%s]failed", #arg);\
+                       HAL_CODEC_SERVICE_LOGE_CONTEXT("[%s]failed", #arg);\
                        return;\
                }\
        } while (0)
 
-#define HAL_CODEC_RETURN_VAL_IF_FAILED(arg, ret) \
+#define HAL_CODEC_SERVICE_RETURN_VAL_IF_FAILED_CONTEXT(arg, ret) \
        do {\
                if (!(arg)) {\
-                       HAL_CODEC_SERVICE_LOGE("[%s]failed, return[%s]", #arg, #ret);\
+                       HAL_CODEC_SERVICE_LOGE_CONTEXT("[%s]failed, return[%s]", #arg, #ret);\
                        return (ret);\
                }\
        } while (0)
 
+#define HAL_CODEC_SERVICE_TASK_PUSH_JOB(task, command, buf, buf_index, LOG_FUNC) \
+       do {\
+               hal_codec_service_task_job_s *job = NULL;\
+               job = g_new0(hal_codec_service_task_job_s, 1);\
+               job->cmd = command;\
+               switch (command) {\
+               case HAL_CODEC_SERVICE_TASK_CMD_DECODE:\
+                       /* fall through */\
+               case HAL_CODEC_SERVICE_TASK_CMD_ENCODE:\
+                       job->buffer = buf;\
+                       break;\
+               case HAL_CODEC_SERVICE_TASK_CMD_RELEASE_OUTPUT_BUFFER:\
+                       job->buffer_index = (intptr_t)buf_index;\
+                       break;\
+               default:\
+                       break;\
+               }\
+               LOG_FUNC("push task command[%d]", job->cmd);\
+               g_mutex_lock(&task.lock);\
+               g_queue_push_tail(&task.queue, job);\
+               g_cond_broadcast(&task.cond);\
+               g_mutex_unlock(&task.lock);\
+       } while (0)
+
+
+typedef enum _hal_codec_service_task_cmd_e {
+       HAL_CODEC_SERVICE_TASK_CMD_STOP = 0,
+       HAL_CODEC_SERVICE_TASK_CMD_DECODE,
+       HAL_CODEC_SERVICE_TASK_CMD_ENCODE,
+       HAL_CODEC_SERVICE_TASK_CMD_RELEASE_OUTPUT_BUFFER,
+       HAL_CODEC_SERVICE_TASK_CMD_FLUSH,
+       HAL_CODEC_SERVICE_TASK_CMD_EXIT_TASK_THREAD
+} hal_codec_service_task_cmd_e;
+
+typedef struct _hal_codec_service_task_job_s {
+       hal_codec_service_task_cmd_e cmd;
+       union {
+               hal_codec_buffer_s *buffer;
+               intptr_t buffer_index;
+       };
+} hal_codec_service_task_job_s;
+
+typedef struct _hal_codec_service_task_s {
+       GThread *thread;
+       GMutex lock;
+       GCond cond;
+       GQueue queue;
+} hal_codec_service_task_s;
 
 typedef struct _hal_codec_service_s {
        void *codec_handle;
-       rpc_port_stub_codec_message_h cb_handle;
+       rpc_port_stub_codec_message_h msg_handle;
+       rpc_port_stub_codec_async_return_h async_return_handle;
        GMutex cb_lock;
+       hal_codec_service_task_s task;
+       bool is_flushing;
 } hal_codec_service_s;
 
 
-
 static hal_codec_service_s *__hal_codec_service_get_handle(rpc_port_stub_codec_context_h context)
 {
        int ipc_ret = 0;
        hal_codec_service_s *service_handle = NULL;
 
-       HAL_CODEC_RETURN_VAL_IF_FAILED(context, NULL);
+       HAL_CODEC_SERVICE_RETURN_VAL_IF_FAILED_CONTEXT(context, NULL);
 
        ipc_ret = rpc_port_stub_codec_context_get_tag(context, (void **)&service_handle);
        if (ipc_ret != RPC_PORT_ERROR_NONE || service_handle == NULL) {
-               HAL_CODEC_SERVICE_LOGE("get handle[%p] from context[%p] failed[0x%x]",
+               HAL_CODEC_SERVICE_LOGE_CONTEXT("get handle[%p] from context[%p] failed[0x%x]",
                        service_handle, context, ipc_ret);
                return NULL;
        }
 
-       HAL_CODEC_SERVICE_LOGI("handle[%p, %p]", service_handle, service_handle->codec_handle);
+       HAL_CODEC_SERVICE_LOGI_CONTEXT("handle[%p, %p]", service_handle, service_handle->codec_handle);
 
        return service_handle;
 }
@@ -100,16 +158,16 @@ static int __hal_codec_service_message_cb(hal_codec_message_s *message, void *us
        rpc_port_stub_array_file_desc_h fd_handle = NULL;
        hal_codec_service_s *service_handle = (hal_codec_service_s *)user_data;
 
-       HAL_CODEC_MSG_RETURN_VAL_IF_FAILED(message, HAL_CODEC_ERROR_INVALID_PARAMETER);
-       HAL_CODEC_MSG_RETURN_VAL_IF_FAILED(service_handle, HAL_CODEC_ERROR_INVALID_PARAMETER);
+       HAL_CODEC_SERVICE_RETURN_VAL_IF_FAILED(message, HAL_CODEC_ERROR_INVALID_PARAMETER);
+       HAL_CODEC_SERVICE_RETURN_VAL_IF_FAILED(service_handle, HAL_CODEC_ERROR_INVALID_PARAMETER);
 
-       HAL_CODEC_SERVICE_MSG_LOGD("message[%p] type[%d] handle[service:%p,cb_handle:%p]",
-               message, message->type, service_handle, service_handle->cb_handle);
+       HAL_CODEC_SERVICE_LOGD("message[%p] type[%d] handle[service:%p,msg_handle:%p]",
+               message, message->type, service_handle, service_handle->msg_handle);
 
        b_msg = bundle_create();
        ipc_ret = bundle_add_byte(b_msg, HAL_CODEC_IPC_PARAM_KEY_MESSAGE, (const void *)message, sizeof(hal_codec_message_s));
        if (ipc_ret != BUNDLE_ERROR_NONE) {
-               HAL_CODEC_SERVICE_MSG_LOGE("add byte for message[type:%d] failed[0x%x]", message->type, ipc_ret);
+               HAL_CODEC_SERVICE_LOGE("add byte for message[type:%d] failed[0x%x]", message->type, ipc_ret);
                bundle_free(b_msg);
                return HAL_CODEC_ERROR_INTERNAL;
        }
@@ -118,13 +176,13 @@ static int __hal_codec_service_message_cb(hal_codec_message_s *message, void *us
        case HAL_CODEC_MESSAGE_TYPE_INPUT_BUFFER_USED:
                buffer = message->buffer;
                if (!buffer) {
-                       HAL_CODEC_SERVICE_MSG_LOGE("NULL buffer for INPUT_BUFFER_USED, message[%p]", message);
+                       HAL_CODEC_SERVICE_LOGE("NULL buffer for INPUT_BUFFER_USED, message[%p]", message);
                        goto _SERVICE_MESSAGE_CB_DONE;
                }
 
                bundle_add_byte(b_msg, HAL_CODEC_IPC_PARAM_KEY_BUFFER, (const void *)buffer, sizeof(hal_codec_buffer_s));
 
-               HAL_CODEC_SERVICE_MSG_LOGD("[INPUT_BUFFER_USED] buffer[%p, index:%d, num_fd:%d] fd[%d:%d:%d:%d]",
+               HAL_CODEC_SERVICE_LOGD("[INPUT_BUFFER_USED] buffer[%p, index:%d, num_fd:%d] fd[%d:%d:%d:%d]",
                        buffer, buffer->index, buffer->memory.num_fd,
                        buffer->memory.fd[0],
                        buffer->memory.fd[1],
@@ -135,7 +193,7 @@ static int __hal_codec_service_message_cb(hal_codec_message_s *message, void *us
                rpc_port_stub_array_file_desc_set(fd_handle, (int *)buffer->memory.fd, buffer->memory.num_fd);
 
                for (uint32_t i = 0; i < buffer->planes.num_planes; i++) {
-                       HAL_CODEC_SERVICE_MSG_LOGD("free INPUT_BUFFER[%d] %p", i, buffer->planes.plane[i].data);
+                       HAL_CODEC_SERVICE_LOGD("free INPUT_BUFFER[%d] %p", i, buffer->planes.plane[i].data);
                        g_free(buffer->planes.plane[i].data);
                }
 
@@ -145,11 +203,11 @@ static int __hal_codec_service_message_cb(hal_codec_message_s *message, void *us
     case HAL_CODEC_MESSAGE_TYPE_OUTPUT_BUFFER:
                buffer = message->buffer;
                if (!buffer) {
-                       HAL_CODEC_SERVICE_MSG_LOGE("NULL buffer for OUTPUT_BUFFER, message[%p]", message);
+                       HAL_CODEC_SERVICE_LOGE("NULL buffer for OUTPUT_BUFFER, message[%p]", message);
                        goto _SERVICE_MESSAGE_CB_DONE;
                }
 
-               HAL_CODEC_SERVICE_MSG_LOGD("[OUTPUT_BUFFER] buffer[%p, index:%d, num_planes:%d] fd[%d:%d:%d:%d]",
+               HAL_CODEC_SERVICE_LOGD("[OUTPUT_BUFFER] buffer[%p, index:%d, num_planes:%d] fd[%d:%d:%d:%d]",
                        buffer, buffer->index, buffer->planes.num_planes,
                        buffer->memory.fd[0],
                        buffer->memory.fd[1],
@@ -168,22 +226,30 @@ static int __hal_codec_service_message_cb(hal_codec_message_s *message, void *us
 
        g_mutex_lock(&service_handle->cb_lock);
 
-       if (service_handle->cb_handle == NULL) {
-               HAL_CODEC_SERVICE_MSG_LOGW("NULL cb_handle");
+       if (message->type == HAL_CODEC_MESSAGE_TYPE_OUTPUT_BUFFER &&
+               service_handle->is_flushing) {
                g_mutex_unlock(&service_handle->cb_lock);
+               HAL_CODEC_SERVICE_LOGW("[OUTPUT_BUFFER] flushing:release buffer[%d]", buffer->index);
+               hal_codec_passthrough_release_output_buffer(service_handle->codec_handle, buffer->index);
                goto _SERVICE_MESSAGE_CB_DONE;
        }
 
-       ipc_ret = rpc_port_stub_codec_message_invoke(service_handle->cb_handle, b_msg, fd_handle);
+       if (service_handle->msg_handle == NULL) {
+               HAL_CODEC_SERVICE_LOGW("NULL msg_handle");
+               g_mutex_unlock(&service_handle->cb_lock);
+               goto _SERVICE_MESSAGE_CB_DONE;
+       }
+
+       ipc_ret = rpc_port_stub_codec_message_invoke(service_handle->msg_handle, b_msg, fd_handle);
        if (ipc_ret != RPC_PORT_ERROR_NONE) {
-               HAL_CODEC_SERVICE_MSG_LOGE("Failed to invoke message_cb");
+               HAL_CODEC_SERVICE_LOGE("Failed to invoke message_cb");
                g_mutex_unlock(&service_handle->cb_lock);
                goto _SERVICE_MESSAGE_CB_DONE;
        }
 
        g_mutex_unlock(&service_handle->cb_lock);
 
-       HAL_CODEC_SERVICE_MSG_LOGD("invoke message: type[%d]", message->type);
+       HAL_CODEC_SERVICE_LOGD("invoke message: type[%d]", message->type);
 
 _SERVICE_MESSAGE_CB_DONE:
        bundle_free(b_msg);
@@ -193,51 +259,309 @@ _SERVICE_MESSAGE_CB_DONE:
 }
 
 
+static gpointer __hal_codec_service_task_func(gpointer data)
+{
+       int ret = 0;
+       int ipc_ret = 0;
+       void *codec_handle = NULL;
+
+       rpc_port_stub_codec_context_h context = (rpc_port_stub_codec_context_h)data;
+       rpc_port_stub_codec_async_return_type_e return_type = RPC_PORT_STUB_ASYNC_RETURN_TYPE_CODEC_NONE;
+
+       hal_codec_service_s *service_handle = __hal_codec_service_get_handle(context);;
+       hal_codec_service_task_s *task = NULL;
+       hal_codec_service_task_job_s *job = NULL;
+
+       HAL_CODEC_SERVICE_RETURN_VAL_IF_FAILED_CONTEXT(service_handle, NULL);
+       HAL_CODEC_SERVICE_RETURN_VAL_IF_FAILED_CONTEXT(service_handle->codec_handle, NULL);
+
+       codec_handle = service_handle->codec_handle;
+
+       SLOGI("start: handle[context:%p, service:%p, backend:%p]",
+               context, service_handle, codec_handle);
+
+       task = &service_handle->task;
+
+       g_mutex_lock(&task->lock);
+
+       while (1) {
+               if (g_queue_is_empty(&task->queue)) {
+                       SLOGD("wait for task");
+                       g_cond_wait(&task->cond, &task->lock);
+                       SLOGD("signal received, check again");
+                       continue;
+               }
+
+               job = g_queue_pop_head(&task->queue);
+               if (!job) {
+                       SLOGW("NULL job");
+                       continue;
+               }
+
+               if (job->cmd == HAL_CODEC_SERVICE_TASK_CMD_EXIT_TASK_THREAD) {
+                       SLOGI("exit task thread");
+                       g_free(job);
+                       break;
+               }
+
+               g_mutex_unlock(&task->lock);
+
+               SLOGD("cmd[%d]", job->cmd);
+
+               return_type = RPC_PORT_STUB_ASYNC_RETURN_TYPE_CODEC_NONE;
+
+               switch (job->cmd) {
+               case HAL_CODEC_SERVICE_TASK_CMD_STOP:
+                       return_type = RPC_PORT_STUB_ASYNC_RETURN_TYPE_CODEC_STOP;
+                       ret = hal_codec_passthrough_stop(codec_handle);
+                       if (ret != HAL_CODEC_ERROR_NONE)
+                               HAL_CODEC_SERVICE_LOGE_CONTEXT("stop failed[0x%x]", ret);
+                       break;
+               case HAL_CODEC_SERVICE_TASK_CMD_DECODE:
+                       return_type = RPC_PORT_STUB_ASYNC_RETURN_TYPE_CODEC_DECODE;
+                       ret = hal_codec_passthrough_decode(codec_handle, job->buffer);
+                       if (ret != HAL_CODEC_ERROR_NONE) {
+                               HAL_CODEC_SERVICE_LOGE_CONTEXT("decode failed[0x%x]", ret);
+                               if (job->buffer) {
+                                       g_free(job->buffer->planes.plane[0].data);
+                                       g_free(job->buffer);
+                                       job->buffer = NULL;
+                               }
+                       }
+                       break;
+               case HAL_CODEC_SERVICE_TASK_CMD_ENCODE:
+                       return_type = RPC_PORT_STUB_ASYNC_RETURN_TYPE_CODEC_ENCODE;
+                       ret = hal_codec_passthrough_encode(codec_handle, job->buffer);
+                       if (ret != HAL_CODEC_ERROR_NONE) {
+                               HAL_CODEC_SERVICE_LOGE_CONTEXT("encode failed[0x%x]", ret);
+                               /* TODO: release buffer? */
+                       }
+                       break;
+               case HAL_CODEC_SERVICE_TASK_CMD_RELEASE_OUTPUT_BUFFER:
+                       return_type = RPC_PORT_STUB_ASYNC_RETURN_TYPE_CODEC_RELEASE_OUTPUT_BUFFER;
+                       ret = hal_codec_passthrough_release_output_buffer(codec_handle, (int)job->buffer_index);
+                       if (ret != HAL_CODEC_ERROR_NONE)
+                               HAL_CODEC_SERVICE_LOGE_CONTEXT("release_output_buffer[%d] failed[0x%x]", (int)job->buffer_index, ret);
+                       break;
+               case HAL_CODEC_SERVICE_TASK_CMD_FLUSH:
+                       return_type = RPC_PORT_STUB_ASYNC_RETURN_TYPE_CODEC_FLUSH;
+                       ret = hal_codec_passthrough_flush(codec_handle);
+                       if (ret != HAL_CODEC_ERROR_NONE)
+                               HAL_CODEC_SERVICE_LOGE_CONTEXT("flush failed[0x%x]", ret);
+                       break;
+               default:
+                       SLOGW("unknown cmd[%d]", job->cmd);
+                       break;
+               }
+
+               g_free(job);
+
+               if (return_type != RPC_PORT_STUB_ASYNC_RETURN_TYPE_CODEC_NONE) {
+                       HAL_CODEC_SERVICE_LOGD("invoke async_return_cb: type[%d], ret[0x%x]", return_type, ret);
+                       ipc_ret = rpc_port_stub_codec_async_return_invoke(service_handle->async_return_handle, return_type, ret);
+                       if (ipc_ret != RPC_PORT_ERROR_NONE)
+                               HAL_CODEC_SERVICE_LOGE("Failed async_return_cb: type[%d], ret[0x%x]", return_type, ipc_ret);
+               }
+
+               g_mutex_lock(&task->lock);
+       }
+
+       g_mutex_unlock(&task->lock);
+
+       SLOGI("leave: context[%p], service_handle[%p]", context, service_handle);
+
+       return NULL;
+}
+
+
 static void _hal_codec_service_rpc_create(rpc_port_stub_codec_context_h context, void *user_data)
 {
-       HAL_CODEC_SERVICE_LOGI("create");
+       HAL_CODEC_SERVICE_LOGI_CONTEXT("create");
 }
 
 
 static void _hal_codec_service_rpc_terminate(rpc_port_stub_codec_context_h context, void *user_data)
 {
-       HAL_CODEC_SERVICE_LOGI("terminate");
+       HAL_CODEC_SERVICE_LOGI_CONTEXT("terminate");
+}
+
+
+static void __hal_codec_service_task_init(rpc_port_stub_codec_context_h context)
+{
+       hal_codec_service_task_s *task = NULL;
+       hal_codec_service_s *service_handle = __hal_codec_service_get_handle(context);
+
+       HAL_CODEC_SERVICE_RETURN_IF_FAILED_CONTEXT(service_handle);
+
+       task = &service_handle->task;
+
+       g_mutex_init(&task->lock);
+       g_cond_init(&task->cond);
+       g_queue_init(&task->queue);
+
+       task->thread = g_thread_new("HAL:Codec:Task",
+               __hal_codec_service_task_func, context);
+
+       SLOGI("new task thread[%p]", task->thread);
+}
+
+
+static void __hal_codec_service_task_job_free(gpointer data)
+{
+       hal_codec_service_task_job_s *job = (hal_codec_service_task_job_s *)data;
+
+       if (!job)
+               return;
+
+       switch (job->cmd) {
+       case HAL_CODEC_SERVICE_TASK_CMD_STOP:
+               SLOGW("remained job [STOP]");
+               break;
+       case HAL_CODEC_SERVICE_TASK_CMD_DECODE:
+               if (job->buffer) {
+                       SLOGW("remained job [DECODE]: buffer[%d]", job->buffer->index);
+                       g_free(job->buffer->planes.plane[0].data);
+                       g_free(job->buffer);
+                       job->buffer = NULL;
+               }
+               break;
+       case HAL_CODEC_SERVICE_TASK_CMD_ENCODE:
+               SLOGW("remained job [ENCODE]");
+               break;
+       case HAL_CODEC_SERVICE_TASK_CMD_RELEASE_OUTPUT_BUFFER:
+               SLOGW("remained job [RELEASE_OUTPUT_BUFFER]");
+               break;
+       case HAL_CODEC_SERVICE_TASK_CMD_FLUSH:
+               SLOGW("remained job [FLUSH]");
+               break;
+       case HAL_CODEC_SERVICE_TASK_CMD_EXIT_TASK_THREAD:
+               SLOGW("remained job [EXIT_TASK_THREAD]");
+               break;
+       default:
+               SLOGW("unknown cmd[%d]", job->cmd);
+               break;
+       }
+
+       g_free(job);
+}
+
+
+static void __hal_codec_service_task_deinit(rpc_port_stub_codec_context_h context)
+{
+       hal_codec_service_s *service_handle = __hal_codec_service_get_handle(context);
+
+       HAL_CODEC_SERVICE_RETURN_IF_FAILED_CONTEXT(service_handle);
+
+       HAL_CODEC_SERVICE_TASK_PUSH_JOB(service_handle->task, HAL_CODEC_SERVICE_TASK_CMD_EXIT_TASK_THREAD, NULL, -1, SLOGI);
+
+       g_thread_join(service_handle->task.thread);
+       service_handle->task.thread = NULL;
+
+       g_queue_clear_full(&service_handle->task.queue, __hal_codec_service_task_job_free);
+
+       SLOGI("done");
+}
+
+
+static int __hal_codec_service_cb_handle_new(hal_codec_service_s *service_handle,
+       rpc_port_stub_codec_message_h msg_handle, rpc_port_stub_codec_async_return_h async_return_handle)
+{
+       int rpc_ret = RPC_PORT_ERROR_NONE;
+       rpc_port_stub_codec_message_h msg_handle_new = NULL;
+       rpc_port_stub_codec_async_return_h async_return_handle_new = NULL;
+
+       HAL_CODEC_SERVICE_RETURN_VAL_IF_FAILED(service_handle, HAL_CODEC_ERROR_INVALID_PARAMETER);
+       HAL_CODEC_SERVICE_RETURN_VAL_IF_FAILED(msg_handle, HAL_CODEC_ERROR_INVALID_PARAMETER);
+       HAL_CODEC_SERVICE_RETURN_VAL_IF_FAILED(async_return_handle, HAL_CODEC_ERROR_INVALID_PARAMETER);
+
+       rpc_ret = rpc_port_stub_codec_message_clone(msg_handle, &msg_handle_new);
+       if (rpc_ret != RPC_PORT_ERROR_NONE) {
+               HAL_CODEC_SERVICE_LOGE("message handle clone failed[0x%x]", rpc_ret);
+               return HAL_CODEC_ERROR_INTERNAL;
+       }
+
+       rpc_ret = rpc_port_stub_codec_async_return_clone(async_return_handle, &async_return_handle_new);
+       if (rpc_ret != RPC_PORT_ERROR_NONE) {
+               HAL_CODEC_SERVICE_LOGE("async return handle clone failed[0x%x]", rpc_ret);
+               rpc_port_stub_codec_message_destroy(msg_handle_new);
+               return HAL_CODEC_ERROR_INTERNAL;
+       }
+
+       HAL_CODEC_SERVICE_LOGI("new cb handle: message[%p], async_return[%p]",
+               msg_handle_new, async_return_handle);
+
+       service_handle->msg_handle = msg_handle_new;
+       service_handle->async_return_handle = async_return_handle_new;
+
+       return HAL_CODEC_ERROR_NONE;
+}
+
+
+static void __hal_codec_service_cb_handle_release(hal_codec_service_s *service_handle)
+{
+       HAL_CODEC_SERVICE_RETURN_IF_FAILED(service_handle);
+
+       HAL_CODEC_SERVICE_LOGI("release cb handle: message[%p], async_return[%p]",
+               service_handle->msg_handle, service_handle->async_return_handle);
+
+       if (service_handle->msg_handle) {
+               rpc_port_stub_codec_message_destroy(service_handle->msg_handle);
+               service_handle->msg_handle = NULL;
+       }
+
+       if (service_handle->async_return_handle) {
+               rpc_port_stub_codec_async_return_destroy(service_handle->async_return_handle);
+               service_handle->async_return_handle = NULL;
+       }
+
+       HAL_CODEC_SERVICE_LOGI("release cb handle: done");
 }
 
 
-static int _hal_codec_service_backend_init(rpc_port_stub_codec_context_h context, int type, void *user_data)
+static int _hal_codec_service_backend_init(rpc_port_stub_codec_context_h context, int type,
+       rpc_port_stub_codec_message_h msg_handle, rpc_port_stub_codec_async_return_h async_return_handle, void *user_data)
 {
        int ret = HAL_CODEC_ERROR_NONE;
        int rpc_ret = RPC_PORT_ERROR_NONE;
        hal_codec_service_s *service_handle = NULL;
 
-       HAL_CODEC_RETURN_VAL_IF_FAILED(context, HAL_CODEC_ERROR_INTERNAL);
+       HAL_CODEC_SERVICE_RETURN_VAL_IF_FAILED_CONTEXT(context, HAL_CODEC_ERROR_INTERNAL);
 
-       HAL_CODEC_SERVICE_LOGI("type[%d]", type);
+       HAL_CODEC_SERVICE_LOGI_CONTEXT("type[%d]", type);
 
        service_handle = g_new0(hal_codec_service_s, 1);
 
        ret = hal_codec_passthrough_init(type, &service_handle->codec_handle);
        if (ret != HAL_CODEC_ERROR_NONE) {
-               HAL_CODEC_SERVICE_LOGE("backend init failed[0x%x]", ret);
-               g_free(service_handle);
-               return ret;
+               HAL_CODEC_SERVICE_LOGE_CONTEXT("backend init failed[0x%x]", ret);
+               goto _BACKEND_INIT_FAILED;
        }
 
        rpc_ret = rpc_port_stub_codec_context_set_tag(context, (char *)service_handle);
        if (rpc_ret != RPC_PORT_ERROR_NONE) {
-               HAL_CODEC_SERVICE_LOGE("set handle to context failed[0x%x]", rpc_ret);
-               hal_codec_passthrough_deinit(service_handle->codec_handle);
-               g_free(service_handle);
-               return HAL_CODEC_ERROR_INTERNAL;
+               HAL_CODEC_SERVICE_LOGE_CONTEXT("set handle to context failed[0x%x]", rpc_ret);
+               goto _BACKEND_INIT_FAILED;
        }
 
+       ret = __hal_codec_service_cb_handle_new(service_handle, msg_handle, async_return_handle);
+       if (ret != HAL_CODEC_ERROR_NONE)
+               goto _BACKEND_INIT_FAILED;
+
        g_mutex_init(&service_handle->cb_lock);
+       __hal_codec_service_task_init(context);
 
-       HAL_CODEC_SERVICE_LOGI("new handle: service[%p], codec[%p]",
+       HAL_CODEC_SERVICE_LOGI_CONTEXT("new handle: service[%p], codec[%p]",
                service_handle, service_handle->codec_handle);
 
        return HAL_CODEC_ERROR_NONE;
+
+_BACKEND_INIT_FAILED:
+       if (service_handle->codec_handle)
+               hal_codec_passthrough_deinit(service_handle->codec_handle);
+
+       g_free(service_handle);
+
+       return ret;
 }
 
 
@@ -246,27 +570,25 @@ static int _hal_codec_service_backend_deinit(rpc_port_stub_codec_context_h conte
        int ret = HAL_CODEC_ERROR_NONE;
        hal_codec_service_s *service_handle = __hal_codec_service_get_handle(context);
 
-       HAL_CODEC_RETURN_VAL_IF_FAILED(service_handle, HAL_CODEC_ERROR_INTERNAL);
+       HAL_CODEC_SERVICE_RETURN_VAL_IF_FAILED_CONTEXT(service_handle, HAL_CODEC_ERROR_INTERNAL);
 
-       ret = hal_codec_passthrough_deinit(service_handle->codec_handle);
-       if (ret != HAL_CODEC_ERROR_NONE) {
-               HAL_CODEC_SERVICE_LOGE("deinit failed[0x%x]", ret);
-               return ret;
-       }
+       __hal_codec_service_task_deinit(context);
 
-       service_handle->codec_handle = NULL;
+       g_mutex_clear(&service_handle->cb_lock);
 
-       if (service_handle->cb_handle) {
-               HAL_CODEC_SERVICE_LOGW("remove remained cb_handle[%p]", service_handle->cb_handle);
-               rpc_port_stub_codec_message_destroy(service_handle->cb_handle);
-               service_handle->cb_handle = NULL;
-       }
+       __hal_codec_service_cb_handle_release(service_handle);
 
-       HAL_CODEC_SERVICE_LOGI("reset tag[service_handle:%p] in context", service_handle);
+       HAL_CODEC_SERVICE_LOGI_CONTEXT("reset tag[service_handle:%p] in context", service_handle);
 
        rpc_port_stub_codec_context_set_tag(context, NULL);
 
-       g_mutex_clear(&service_handle->cb_lock);
+       ret = hal_codec_passthrough_deinit(service_handle->codec_handle);
+       if (ret != HAL_CODEC_ERROR_NONE) {
+               HAL_CODEC_SERVICE_LOGE_CONTEXT("deinit failed[0x%x]", ret);
+               return ret;
+       }
+
+       service_handle->codec_handle = NULL;
 
        g_free(service_handle);
 
@@ -280,16 +602,16 @@ static int _hal_codec_service_backend_configure(rpc_port_stub_codec_context_h co
        int ret = HAL_CODEC_ERROR_NONE;
        hal_codec_service_s *service_handle = __hal_codec_service_get_handle(context);
 
-       HAL_CODEC_RETURN_VAL_IF_FAILED(service_handle, HAL_CODEC_ERROR_INTERNAL);
+       HAL_CODEC_SERVICE_RETURN_VAL_IF_FAILED_CONTEXT(service_handle, HAL_CODEC_ERROR_INTERNAL);
 
-       HAL_CODEC_SERVICE_LOGI("[%dx%d], format[in:%d,out:%d], secure[%d]",
+       HAL_CODEC_SERVICE_LOGI_CONTEXT("[%dx%d], format[in:%d,out:%d], secure[%d]",
                width, height, in_format, out_format, is_secure);
 
        ret = hal_codec_passthrough_configure(service_handle->codec_handle,
                width, height, in_format, out_format, is_secure);
 
        if (ret != HAL_CODEC_ERROR_NONE)
-               HAL_CODEC_SERVICE_LOGE("configure failed[0x%x]", ret);
+               HAL_CODEC_SERVICE_LOGE_CONTEXT("configure failed[0x%x]", ret);
 
        return ret;
 }
@@ -300,43 +622,32 @@ static int _hal_codec_service_backend_release(rpc_port_stub_codec_context_h cont
        int ret = HAL_CODEC_ERROR_NONE;
        hal_codec_service_s *service_handle = __hal_codec_service_get_handle(context);
 
-       HAL_CODEC_RETURN_VAL_IF_FAILED(service_handle, HAL_CODEC_ERROR_INTERNAL);
+       HAL_CODEC_SERVICE_RETURN_VAL_IF_FAILED_CONTEXT(service_handle, HAL_CODEC_ERROR_INTERNAL);
 
        ret = hal_codec_passthrough_release(service_handle->codec_handle);
 
        if (ret != HAL_CODEC_ERROR_NONE)
-               HAL_CODEC_SERVICE_LOGE("release failed[0x%x]", ret);
+               HAL_CODEC_SERVICE_LOGE_CONTEXT("release failed[0x%x]", ret);
 
        return ret;
 }
 
 
-static int _hal_codec_service_backend_start(rpc_port_stub_codec_context_h context, rpc_port_stub_codec_message_h cb_handle, void *user_data)
+static int _hal_codec_service_backend_start(rpc_port_stub_codec_context_h context, void *user_data)
 {
        int ret = HAL_CODEC_ERROR_NONE;
-       int ipc_ret = 0;
        hal_codec_service_s *service_handle = __hal_codec_service_get_handle(context);
-       rpc_port_stub_codec_message_h cb_handle_new = NULL;
-
-       HAL_CODEC_RETURN_VAL_IF_FAILED(service_handle, HAL_CODEC_ERROR_INTERNAL);
 
-       ipc_ret = rpc_port_stub_codec_message_clone(cb_handle, &cb_handle_new);
-       if (ipc_ret != RPC_PORT_ERROR_NONE) {
-               HAL_CODEC_SERVICE_LOGE("msg cb handle clone failed[0x%x]", ipc_ret);
-               return HAL_CODEC_ERROR_INTERNAL;
-       }
+       HAL_CODEC_SERVICE_RETURN_VAL_IF_FAILED_CONTEXT(service_handle, HAL_CODEC_ERROR_INTERNAL);
 
        g_mutex_lock(&service_handle->cb_lock);
-       service_handle->cb_handle = cb_handle_new;
+       service_handle->is_flushing = false;
        g_mutex_unlock(&service_handle->cb_lock);
 
        ret = hal_codec_passthrough_start(service_handle->codec_handle,
                __hal_codec_service_message_cb, service_handle);
-       if (ret != HAL_CODEC_ERROR_NONE) {
-               HAL_CODEC_SERVICE_LOGE("start failed[0x%x]", ret);
-               rpc_port_stub_codec_message_destroy(cb_handle_new);
-               service_handle->cb_handle = NULL;
-       }
+       if (ret != HAL_CODEC_ERROR_NONE)
+               HAL_CODEC_SERVICE_LOGE_CONTEXT("start failed[0x%x]", ret);
 
        return ret;
 }
@@ -344,23 +655,11 @@ static int _hal_codec_service_backend_start(rpc_port_stub_codec_context_h contex
 
 static int _hal_codec_service_backend_stop(rpc_port_stub_codec_context_h context, void *user_data)
 {
-       int ret = HAL_CODEC_ERROR_NONE;
        hal_codec_service_s *service_handle = __hal_codec_service_get_handle(context);
 
-       HAL_CODEC_RETURN_VAL_IF_FAILED(service_handle, HAL_CODEC_ERROR_INTERNAL);
+       HAL_CODEC_SERVICE_RETURN_VAL_IF_FAILED_CONTEXT(service_handle, HAL_CODEC_ERROR_INTERNAL);
 
-       ret = hal_codec_passthrough_stop(service_handle->codec_handle);
-       if (ret != HAL_CODEC_ERROR_NONE) {
-               HAL_CODEC_SERVICE_LOGE("stop failed[0x%x]", ret);
-               return ret;
-       }
-
-       HAL_CODEC_SERVICE_LOGD("release cb_handle[%p]", service_handle->cb_handle);
-       rpc_port_stub_codec_message_destroy(service_handle->cb_handle);
-
-       g_mutex_lock(&service_handle->cb_lock);
-       service_handle->cb_handle = NULL;
-       g_mutex_unlock(&service_handle->cb_lock);
+       HAL_CODEC_SERVICE_TASK_PUSH_JOB(service_handle->task, HAL_CODEC_SERVICE_TASK_CMD_STOP, NULL, -1, SLOGI);
 
        return HAL_CODEC_ERROR_NONE;
 }
@@ -368,34 +667,36 @@ static int _hal_codec_service_backend_stop(rpc_port_stub_codec_context_h context
 
 static int _hal_codec_service_backend_flush(rpc_port_stub_codec_context_h context, void *user_data)
 {
-       int ret = HAL_CODEC_ERROR_NONE;
        hal_codec_service_s *service_handle = __hal_codec_service_get_handle(context);
 
-       HAL_CODEC_RETURN_VAL_IF_FAILED(service_handle, HAL_CODEC_ERROR_INTERNAL);
+       HAL_CODEC_SERVICE_RETURN_VAL_IF_FAILED_CONTEXT(service_handle, HAL_CODEC_ERROR_INTERNAL);
 
-       ret = hal_codec_passthrough_flush(service_handle->codec_handle);
+       g_mutex_lock(&service_handle->cb_lock);
 
-       if (ret != HAL_CODEC_ERROR_NONE)
-               HAL_CODEC_SERVICE_LOGE("flush failed[0x%x]", ret);
+       HAL_CODEC_SERVICE_LOGE_CONTEXT("set flushing");
+       service_handle->is_flushing = true;
 
-       return ret;
+       g_mutex_unlock(&service_handle->cb_lock);
+
+       HAL_CODEC_SERVICE_TASK_PUSH_JOB(service_handle->task, HAL_CODEC_SERVICE_TASK_CMD_FLUSH, NULL, -1, SLOGI);
+
+       return HAL_CODEC_ERROR_NONE;
 }
 
 
 static int _hal_codec_service_backend_decode(rpc_port_stub_codec_context_h context, bundle *b_buffer, void *user_data)
 {
-       int ret = HAL_CODEC_ERROR_NONE;
        int ipc_ret = 0;
        void *b_data = NULL;
        size_t b_size = 0;
        hal_codec_buffer_s *buffer = NULL;
        hal_codec_service_s *service_handle = __hal_codec_service_get_handle(context);
 
-       HAL_CODEC_RETURN_VAL_IF_FAILED(service_handle, HAL_CODEC_ERROR_INTERNAL);
+       HAL_CODEC_SERVICE_RETURN_VAL_IF_FAILED_CONTEXT(service_handle, HAL_CODEC_ERROR_INTERNAL);
 
        ipc_ret = bundle_get_byte(b_buffer, HAL_CODEC_IPC_PARAM_KEY_BUFFER, &b_data, &b_size);
        if (ipc_ret != BUNDLE_ERROR_NONE || b_data == NULL || b_size != sizeof(hal_codec_buffer_s)) {
-               HAL_CODEC_SERVICE_LOGE("get data to decode failed[0x%x,%p,%zu/%zu",
+               HAL_CODEC_SERVICE_LOGE_CONTEXT("get data to decode failed[0x%x,%p,%zu/%zu",
                        ipc_ret, b_data, b_size, sizeof(hal_codec_buffer_s));
                return HAL_CODEC_ERROR_INTERNAL;
        }
@@ -407,47 +708,36 @@ static int _hal_codec_service_backend_decode(rpc_port_stub_codec_context_h conte
 
        ipc_ret = bundle_get_byte(b_buffer, HAL_CODEC_IPC_PARAM_KEY_PLANE0_DATA, &b_data, &b_size);
        if (ipc_ret != BUNDLE_ERROR_NONE || b_data == NULL || b_size != buffer->planes.plane[0].size) {
-               HAL_CODEC_SERVICE_LOGE("get data to decode failed[0x%x,%p,%zu/%u]",
+               HAL_CODEC_SERVICE_LOGE_CONTEXT("get data to decode failed[0x%x,%p,%zu/%u]",
                        ipc_ret, b_data, b_size, buffer->planes.plane[0].size);
-               ret = HAL_CODEC_ERROR_INTERNAL;
-               goto _SERVICE_DECODE_DONE;
+               g_free(buffer);
+               return HAL_CODEC_ERROR_INTERNAL;
        }
 
        buffer->planes.plane[0].data = (unsigned char *)g_memdup2(b_data, b_size);
 
-       HAL_CODEC_SERVICE_LOGD("buffer[%d] size[%zu]", buffer->index, b_size);
-
-       ret = hal_codec_passthrough_decode(service_handle->codec_handle, buffer);
-
-_SERVICE_DECODE_DONE:
-       if (ret != HAL_CODEC_ERROR_NONE) {
-               HAL_CODEC_SERVICE_LOGE("decode failed[0x%x]", ret);
-
-               if (buffer->planes.plane[0].data)
-                       g_free(buffer->planes.plane[0].data);
+       HAL_CODEC_SERVICE_LOGD_CONTEXT("buffer[%d] size[%zu]", buffer->index, b_size);
 
-               g_free(buffer);
-       }
+       HAL_CODEC_SERVICE_TASK_PUSH_JOB(service_handle->task, HAL_CODEC_SERVICE_TASK_CMD_DECODE, buffer, -1, SLOGD);
 
-       return ret;
+       return HAL_CODEC_ERROR_NONE;
 }
 
 
 static int _hal_codec_service_backend_release_output_buffer(rpc_port_stub_codec_context_h context, int buffer_index, void *user_data)
 {
-       int ret = HAL_CODEC_ERROR_NONE;
        hal_codec_service_s *service_handle = __hal_codec_service_get_handle(context);
 
-       HAL_CODEC_RETURN_VAL_IF_FAILED(service_handle, HAL_CODEC_ERROR_INTERNAL);
-
-       HAL_CODEC_SERVICE_LOGD("buffer index[%d]", buffer_index);
+       HAL_CODEC_SERVICE_RETURN_VAL_IF_FAILED_CONTEXT(service_handle, HAL_CODEC_ERROR_INTERNAL);
 
-       ret = hal_codec_passthrough_release_output_buffer(service_handle->codec_handle, buffer_index);
+       HAL_CODEC_SERVICE_LOGD_CONTEXT("buffer index[%d]", buffer_index);
+#if 0
+       HAL_CODEC_SERVICE_TASK_PUSH_JOB(service_handle->task, HAL_CODEC_SERVICE_TASK_CMD_RELEASE_OUTPUT_BUFFER, NULL, buffer_index, SLOGD);
 
-       if (ret != HAL_CODEC_ERROR_NONE)
-               HAL_CODEC_SERVICE_LOGE("release_output_buffer failed[0x%x]", ret);
-
-       return ret;
+       return HAL_CODEC_ERROR_NONE;
+#else
+       return hal_codec_passthrough_release_output_buffer(service_handle->codec_handle, buffer_index);
+#endif
 }
 
 
@@ -457,12 +747,12 @@ static int _hal_codec_service_backend_get_state(rpc_port_stub_codec_context_h co
        hal_codec_service_s *service_handle = __hal_codec_service_get_handle(context);
        hal_codec_state_e state_;
 
-       HAL_CODEC_RETURN_VAL_IF_FAILED(service_handle, HAL_CODEC_ERROR_INTERNAL);
+       HAL_CODEC_SERVICE_RETURN_VAL_IF_FAILED_CONTEXT(service_handle, HAL_CODEC_ERROR_INTERNAL);
 
        ret = hal_codec_passthrough_get_state(service_handle->codec_handle, &state_);
 
        if (ret != HAL_CODEC_ERROR_NONE) {
-               HAL_CODEC_SERVICE_LOGE("get_state failed[0x%x]", ret);
+               HAL_CODEC_SERVICE_LOGE_CONTEXT("get_state failed[0x%x]", ret);
                return ret;
        }