Use receiving thread for c stub 61/239261/9
authorhyunho <hhstark.kang@samsung.com>
Thu, 23 Jul 2020 07:45:43 +0000 (16:45 +0900)
committerhyunho <hhstark.kang@samsung.com>
Fri, 31 Jul 2020 04:45:39 +0000 (13:45 +0900)
Change-Id: Ia309013c19dba6f81115c0aafbacd7ebb638f229
Signed-off-by: hyunho <hhstark.kang@samsung.com>
idlc/gen/c_gen_base_cb.h
idlc/gen/c_stub_body_gen.cc
idlc/gen/c_stub_body_gen.h
idlc/gen/c_stub_body_gen_cb.h
idlc/main.cc

index 5fe9c44..aa20818 100644 (file)
@@ -38,6 +38,7 @@ R"__c_cb(
 
 const char CB_BODY_HEADER[] =
 R"__c_cb(
+#include <pthread.h>
 #include <stdio.h>
 #include <string.h>
 #include <stdlib.h>
index 74909f6..7650f5d 100644 (file)
@@ -22,8 +22,9 @@ namespace {
 
 namespace tidl {
 
-CStubBodyGen::CStubBodyGen(std::shared_ptr<Document> doc)
-    : CBodyGeneratorBase(doc) {}
+CStubBodyGen::CStubBodyGen(std::shared_ptr<Document> doc,
+    std::shared_ptr<Options> options)
+    : CBodyGeneratorBase(doc), options_(options) {}
 
 void CStubBodyGen::OnInitGen(std::ofstream& stream) {
   GenVersion(stream);
@@ -124,6 +125,12 @@ void CStubBodyGen::GenInterfaceMethodTable(std::ofstream& stream,
 
 void CStubBodyGen::GenInterfaceOnConnectedEventCB(std::ofstream& stream,
                                                   const Interface& inf) {
+  if (options_->IsThreadEnabled()) {
+    stream << SmartIndent(
+      ReplaceAll(CB_INTERFACE_THREAD_METHODS, "##",
+          GetInterfaceIdWithNamespace(inf)));
+  }
+
   stream << SmartIndent(
       ReplaceAll(CB_INTERFACE_ON_CONNECTED, "##",
           GetInterfaceIdWithNamespace(inf)));
@@ -138,21 +145,46 @@ void CStubBodyGen::GenInterfaceOnDisconnectedEventCB(std::ofstream& stream,
 
 void CStubBodyGen::GenInterfaceOnReceivedEventCB(std::ofstream& stream,
                                                  const Interface& inf) {
-  stream << SmartIndent(
-      ReplaceAll(CB_INTERFACE_ON_RECEIVED, "##",
+  if (options_->IsThreadEnabled()) {
+    stream << SmartIndent(
+      ReplaceAll(CB_INTERFACE_THREAD_ON_RECEIVED, "##",
           GetInterfaceIdWithNamespace(inf)));
+  } else {
+    stream << SmartIndent(
+        ReplaceAll(CB_INTERFACE_ON_RECEIVED, "##",
+            GetInterfaceIdWithNamespace(inf)));
+  }
 }
 
 void CStubBodyGen::GenInterfaceRegister(std::ofstream& stream,
                                         const Interface& inf) {
-  stream << SmartIndent(ReplaceAll(CB_INTERFACE_REGISTER, "##",
-      GetInterfaceIdWithNamespace(inf)));
+  std::string r_str = ReplaceAll(CB_INTERFACE_REGISTER, "##",
+      GetInterfaceIdWithNamespace(inf));
+
+  stream << SmartIndent(GenTemplateString(r_str,
+        [&]()->std::string {
+          if (options_->IsThreadEnabled())
+            return CB_INTERFACE_THREAD_CREATE;
+          return "";
+        }));
 }
 
 void CStubBodyGen::GenInterfaceUnregister(std::ofstream& stream,
                                           const Interface& inf) {
-  stream << SmartIndent(ReplaceAll(CB_INTERFACE_UNREGISTER, "##",
-      GetInterfaceIdWithNamespace(inf)));
+  std::string r_str = ReplaceAll(CB_INTERFACE_UNREGISTER, "##",
+      GetInterfaceIdWithNamespace(inf));
+
+  stream << SmartIndent(GenTemplateString(r_str,
+        [&]()->std::string {
+          if (options_->IsThreadEnabled())
+            return "job_h job;";
+          return "";
+        },
+        [&]()->std::string {
+          if (options_->IsThreadEnabled())
+            return CB_INTERFACE_THREAD_DESTROY;
+          return "";
+        }));
 }
 
 void CStubBodyGen::GenInterfaceGlobalVariables(std::ofstream& stream,
@@ -497,6 +529,10 @@ void CStubBodyGen::GenInterfaceContextDeclaration(std::ofstream& stream,
                                                   const Interface& inf) {
   stream << SmartIndent(ReplaceAll(
       CB_INTERFACE_CONTEXT_DECL, "##", GetInterfaceIdWithNamespace(inf)));
+  if (options_->IsThreadEnabled()) {
+    stream << SmartIndent(ReplaceAll(
+      CB_INTERFACE_THREAD_FEATURE, "##", GetInterfaceIdWithNamespace(inf)));
+  }
 }
 
 void CStubBodyGen::GenInterfaceContextConstructor(std::ofstream& stream,
index 16bf22e..f787774 100644 (file)
 #include <string>
 
 #include "idlc/gen/c_body_gen_base.h"
+#include "idlc/options.h"
 
 namespace tidl {
 
 class CStubBodyGen : public CBodyGeneratorBase {
  public:
-  explicit CStubBodyGen(std::shared_ptr<Document> doc);
+  explicit CStubBodyGen(std::shared_ptr<Document> doc,
+      std::shared_ptr<Options> options);
   virtual ~CStubBodyGen() = default;
 
   void OnInitGen(std::ofstream& stream) override;
@@ -104,6 +106,9 @@ class CStubBodyGen : public CBodyGeneratorBase {
                                     const Attribute& attr);
   std::string GetTrustedModeString(const std::string& id,
                                    const Attribute& attr);
+
+ private:
+  std::shared_ptr<Options> options_;
 };
 
 }  // namespace tidl
index 6b702d0..a4dc8c4 100644 (file)
@@ -45,6 +45,34 @@ const char CB_INTERFACE_METHOD_FORMAT[] =
 R"__c_cb(
 [$$] = $$,)__c_cb";
 
+const char CB_INTERFACE_THREAD_METHODS[] =
+R"__c_cb(
+static int __run_pending_job(void)
+{
+    int cmd = -1;
+    job_h job;
+    int r = 0;
+
+    if (g_queue_is_empty(__job_queue)) {
+        _E("Empty queue");
+        return -1;
+    }
+
+    job = g_queue_pop_head(__job_queue);
+    rpc_port_parcel_read_int32(job->parcel, &cmd);
+    if (cmd > 1 && cmd < (sizeof(__##_method_table) / sizeof(__##_method_table[0]))) {
+        if (__##_method_table[cmd])
+        r = __##_method_table[cmd](job->port, job->parcel, job->context);
+    } else {
+        _E("Unknown Command(%d)", cmd);
+        r = -1;
+    }
+
+    __destroy_job(job);
+    return r;
+}
+)__c_cb";
+
 const char CB_INTERFACE_ON_CONNECTED[] =
 R"__c_cb(
 static void __##_on_connected(const char *sender, const char *instance, void *data)
@@ -118,6 +146,39 @@ static int __##_on_received(const char *sender, const char *instance, rpc_port_h
 }
 )__c_cb";
 
+const char CB_INTERFACE_THREAD_ON_RECEIVED[] =
+R"__c_cb(
+static int __##_on_received(const char *sender, const char *instance, rpc_port_h port, void *data)
+{
+    rpc_port_stub_##_context_h context;
+    int r;
+
+    _I("[__RPC_PORT__] sender(%s), instance(%s)", sender, instance);
+    context = __find_##_context(instance);
+    if (!context) {
+        _E("Failed to find ## context(%s)", instance);
+        return -1;
+    }
+    r = __add_thread_queue(port, context);
+
+    return r;
+}
+)__c_cb";
+
+const char CB_INTERFACE_THREAD_CREATE[] =
+R"__c_cb(
+__job_queue = g_queue_new();
+)__c_cb";
+
+const char CB_INTERFACE_THREAD_DESTROY[] =
+R"__c_cb(
+while (!g_queue_is_empty(__job_queue)) {
+    job = g_queue_pop_head(__job_queue);
+    __destroy_job(job);
+}
+g_queue_free(__job_queue);
+)__c_cb";
+
 const char CB_INTERFACE_REGISTER[] =
 R"__c_cb(
 int rpc_port_stub_##_register(rpc_port_stub_##_callback_s *callback, void *user_data)
@@ -141,6 +202,7 @@ int rpc_port_stub_##_register(rpc_port_stub_##_callback_s *callback, void *user_
         _E("Failed to create stub handle");
         return r;
     }
+$$
 
     r = rpc_port_stub_add_received_event_cb(__##_stub, __##_on_received, NULL);
     if (r != 0) {
@@ -191,6 +253,7 @@ R"__c_cb(
 int rpc_port_stub_##_unregister(void)
 {
     int r;
+$$
 
     if (!__##_stub)
         return -1;
@@ -202,6 +265,7 @@ int rpc_port_stub_##_unregister(void)
 
     r = rpc_port_stub_destroy(__##_stub);
     __##_stub = NULL;
+$$
 
     return r;
 }
@@ -211,19 +275,19 @@ const char CB_INTERFACE_CLIENT_NUMBER_GETTER[] =
 R"__c_cb(
 int rpc_port_stub_##_get_client_number(unsigned int *n)
 {
-       if (!n) {
-               _E("Invalid parameter");
-               return -1;
-       }
+    if (!n) {
+        _E("Invalid parameter");
+        return -1;
+    }
 
-       if (!__##_stub) {
-               _E("## Stub is not ready");
-               return -1;
-       }
+    if (!__##_stub) {
+        _E("## Stub is not ready");
+        return -1;
+    }
 
-       *n = g_list_length(__##_contexts);
+    *n = g_list_length(__##_contexts);
 
-       return 0;
+    return 0;
 }
 )__c_cb";
 
@@ -417,10 +481,81 @@ struct ##_context_s {
     rpc_port_h port;
     void *tag;
     rpc_port_stub_##_callback_s callback;
+    pthread_mutex_t mutex;
+    pthread_cond_t cond;
+    pthread_t thread;
+    bool run_wait_queue;
     void *user_data;
 };
 )__c_cb";
 
+const char CB_INTERFACE_THREAD_FEATURE[] =
+R"__c_cb(
+static GQueue *__job_queue;
+typedef struct {
+    rpc_port_parcel_h parcel;
+    rpc_port_h port;
+    void *context;
+} job_s;
+typedef job_s *job_h;
+
+static int __run_pending_job();
+static void *__wait_queue(void *data)
+{
+    rpc_port_stub_##_context_h context =
+        (rpc_port_stub_##_context_h)data;
+    while(context->run_wait_queue) {
+        pthread_mutex_lock(&context->mutex);
+            pthread_cond_wait(&context->cond, &context->mutex);
+        __run_pending_job();
+            pthread_mutex_unlock(&context->mutex);
+    }
+    return NULL;
+}
+
+static job_h __create_job(rpc_port_parcel_h parcel, rpc_port_h port,
+        void *context)
+{
+    job_h job = (job_s *)calloc(1, sizeof(job_s));
+
+    job->parcel = parcel;
+    job->port = port;
+    job->context = context;
+    return job;
+}
+
+static void __destroy_job(job_h job)
+{
+    rpc_port_parcel_destroy(job->parcel);
+    free(job);
+}
+
+static int __add_thread_queue(rpc_port_h port,
+    rpc_port_stub_ThreadSample_context_h context)
+{
+    job_h job;
+    int r;
+    rpc_port_parcel_h parcel;
+
+    r = rpc_port_parcel_create_from_port(&parcel, port);
+    if (r != 0) {
+        _E("Failed to create parcel from port");
+        return r;
+    }
+    job = __create_job(parcel, port, context);
+    g_queue_push_tail(__job_queue, job);
+    if (g_queue_is_empty(__job_queue)) {
+        _E("Empty queue ??");
+        return -1;
+    }
+
+    pthread_mutex_lock(&context->mutex);
+    pthread_cond_signal(&context->cond);
+    pthread_mutex_unlock(&context->mutex);
+    return 0;
+}
+)__c_cb";
+
 const char CB_INTERFACE_CONTEXT_CTOR[] =
 R"__c_cb(
 static struct ##_context_s *__create_##_context(const char *sender, const char *instance)
@@ -450,6 +585,14 @@ static struct ##_context_s *__create_##_context(const char *sender, const char *
 
     handle->callback = __##_callback;
     handle->user_data = __##_user_data;
+    pthread_mutex_init(&handle->mutex, NULL);
+    pthread_cond_init(&handle->cond, NULL);
+    handle->run_wait_queue = true;
+    if (pthread_create(&handle->thread, NULL,
+        &__wait_queue, (void *)handle) <0) {
+        _E("Fail to create thread");
+        return NULL;
+    }
 
     return handle;
 }
@@ -460,6 +603,8 @@ R"__c_cb(
 static void __destroy_##_context(gpointer data)
 {
     struct ##_context_s *handle = data;
+    void *return_val;
+    int err;
 
     if (!handle) {
         _E("Critical error!");
@@ -468,6 +613,18 @@ static void __destroy_##_context(gpointer data)
 
     free(handle->instance);
     free(handle->sender);
+
+    handle->run_wait_queue = false;
+    pthread_mutex_lock(&handle->mutex);
+    pthread_cond_signal(&handle->cond);
+    pthread_mutex_unlock(&handle->mutex);
+    err = pthread_join(handle->thread, &return_val);
+    if (0 != err)
+        _E("joining thread error [%d].", err);
+
+    pthread_cond_destroy(&handle->cond);
+    pthread_mutex_destroy(&handle->mutex);
+
     free(handle);
 }
 )__c_cb";
index 22faf9c..2b9cbb7 100644 (file)
@@ -82,7 +82,7 @@ int main(int argc, char** argv) {
       tidl::CStubHeaderGen stub_header(ps.GetDoc());
       stub_header.EnableNamespace(options->HasNamespace());
       stub_header.Run(options->GetOutput() + ".h");
-      tidl::CStubBodyGen stub_body(ps.GetDoc());
+      tidl::CStubBodyGen stub_body(ps.GetDoc(), options);
       stub_body.EnableNamespace(options->HasNamespace());
       stub_body.Run(options->GetOutput() + ".c");
     }