Fix daemon crash in multi-threaded environment 36/70236/3
authorJaemin Ryu <jm77.ryu@samsung.com>
Thu, 19 May 2016 02:28:16 +0000 (11:28 +0900)
committerSungbae Yoo <sungbae.yoo@samsung.com>
Fri, 20 May 2016 00:56:28 +0000 (17:56 -0700)
Change-Id: Ibbc67875481776e1b23cd280d8ea95ce8a713397
Signed-off-by: Jaemin Ryu <jm77.ryu@samsung.com>
14 files changed:
common/mainloop.cpp
common/rmi/client.cpp
common/rmi/client.h
common/rmi/connection.cpp
common/rmi/message.cpp
common/rmi/message.h
common/rmi/notification.cpp
common/rmi/notification.h
common/rmi/service.cpp
common/rmi/service.h
libs/policy-client.cpp
libs/restriction.cpp
tests/api/CMakeLists.txt
tests/api/context.c [new file with mode: 0644]

index 5ecb608..ecece29 100644 (file)
@@ -64,7 +64,7 @@ void Mainloop::addEventSource(const int fd, const Event events, Callback&& callb
     std::lock_guard<Mutex> lock(mutex);
 
     if (callbacks.find(fd) != callbacks.end()) {
-        throw Exception(GetSystemErrorMessage());
+        throw Exception("Event source already registered");
     }
 
     ::memset(&event, 0, sizeof(epoll_event));
index f870851..de85d85 100644 (file)
@@ -37,6 +37,7 @@ void Client::connect()
 
 int Client::unsubscribe(const std::string& provider, int id)
 {
+    // file descriptor(id) is automatically closed when mainloop callback is destroyed.
     mainloop.removeEventSource(id);
     return 0;
 }
@@ -47,12 +48,8 @@ int Client::subscribe(const std::string& provider, const std::string& name)
     request.packParameters(name);
     connection->send(request);
 
-    Message reply = connection->dispatch();
-    if (reply.isError()) {
-        return -1;
-    }
-
     FileDescriptor response;
+    Message reply = connection->dispatch();
     reply.disclose(response);
 
     return response.fileDescriptor;
index 2adbcd1..0d3761a 100644 (file)
@@ -98,9 +98,8 @@ Type Client::methodCall(const std::string& method, Args&&... args)
     request.packParameters(std::forward<Args>(args)...);
     connection->send(request);
 
-    Message reply = connection->dispatch();
-
     Type response;
+    Message reply = connection->dispatch();
     reply.disclose<Type>(response);
 
     return response;
index b67e40f..f16a529 100644 (file)
@@ -43,9 +43,15 @@ void Connection::send(const Message& message) const
 Message Connection::dispatch() const
 {
     Message message;
-
     std::lock_guard<std::mutex> lock(receiveMutex);
+
     message.decode(socket);
+    if (message.isError()) {
+        std::string exception;
+        message.disclose(exception);
+
+        throw runtime::Exception(exception);
+    }
 
     return message;
 }
index 4555218..3a9e0f5 100644 (file)
@@ -81,9 +81,12 @@ Message Message::createReplyMessage() const
     return Message(id(), Reply, target());
 }
 
-Message Message::createErrorMessage() const
+Message Message::createErrorMessage(const std::string& message) const
 {
-    return Message(id(), Error, target());
+    Message error(id(), Error, target());
+    error.enclose(message);
+
+    return error;
 }
 
 template<> void Message::enclose(FileDescriptor&& fd)
index 41f7d35..8a00fba 100644 (file)
@@ -55,7 +55,7 @@ public:
 
     // [TBD] Take arguments
     Message createReplyMessage() const;
-    Message createErrorMessage() const;
+    Message createErrorMessage(const std::string& message) const;
 
     unsigned int id() const
     {
index e36198d..d2b2f4e 100644 (file)
@@ -47,7 +47,7 @@ SubscriptionId Notification::createSubscriber()
     }
 
     std::lock_guard<std::mutex> lock(subscriberLock);
-       subscribers.emplace_back(std::make_shared<Socket>(fds[0]));
+       subscribers.push_back(std::make_shared<Socket>(fds[0]));
 
     return SubscriptionId(fds[0], fds[1]);
 }
@@ -56,13 +56,14 @@ int Notification::removeSubscriber(const int id)
 {
     std::lock_guard<std::mutex> lock(subscriberLock);
 
-    std::vector<std::shared_ptr<Socket>>::iterator it = subscribers.begin();
+    std::list<std::shared_ptr<Socket>>::iterator it = subscribers.begin();
 
     while (it != subscribers.end()) {
        if ((*it)->getFd() == id) {
             subscribers.erase(it);
             return 0;
        }
+       ++it;
     }
 
     return -1;
index 72162aa..7da9e51 100644 (file)
@@ -21,6 +21,7 @@
 #include <vector>
 #include <mutex>
 #include <unordered_map>
+#include <list>
 #include <utility>
 #include <memory>
 
@@ -47,7 +48,7 @@ public:
 
 private:
        std::string signalName;
-       std::vector<std::shared_ptr<Socket>> subscribers;
+       std::list<std::shared_ptr<Socket>> subscribers;
     std::mutex subscriberLock;
 };
 
@@ -59,7 +60,7 @@ void Notification::notify(Args&&... args)
 
     std::lock_guard<std::mutex> lock(subscriberLock);
 
-    for (std::shared_ptr<Socket>& subscriber : subscribers) {
+    for (const std::shared_ptr<Socket>& subscriber : subscribers) {
         try {
             msg.encode(*subscriber);
         } catch (runtime::Exception& e) {
index 52cae3a..c097ec5 100644 (file)
@@ -22,6 +22,7 @@
 #include "exception.h"
 #include "service.h"
 #include "message.h"
+#include "audit/logger.h"
 
 namespace rmi {
 
@@ -60,8 +61,6 @@ void Service::stop()
 
 Service::ConnectionRegistry::iterator Service::getConnectionIterator(const int id)
 {
-    std::lock_guard<std::mutex> lock(stateLock);
-
     return std::find_if(connectionRegistry.begin(), connectionRegistry.end(),
                         [id](const std::shared_ptr<Connection>& connection) {
         return id == connection->getFd();
@@ -72,6 +71,8 @@ void Service::setNewConnectionCallback(const ConnectionCallback& connectionCallb
 {
     auto callback = [connectionCallback, this](const std::shared_ptr<Connection>& connection) {
         auto handle = [&](int fd, runtime::Mainloop::Event event) {
+            std::lock_guard<std::mutex> lock(stateLock);
+
             auto iter = getConnectionIterator(fd);
             if (iter == connectionRegistry.end()) {
                 return;
@@ -91,6 +92,7 @@ void Service::setNewConnectionCallback(const ConnectionCallback& connectionCallb
             mainloop.addEventSource(connection->getFd(),
                                     EPOLLIN | EPOLLHUP | EPOLLRDHUP,
                                     handle);
+            std::lock_guard<std::mutex> lock(stateLock);
             connectionRegistry.push_back(connection);
         }
     };
@@ -120,12 +122,12 @@ void Service::createNotification(const std::string& name)
         throw runtime::Exception("Notification already registered");
     }
 
-    notificationRegistry.emplace(name, name);
+    notificationRegistry.emplace(name, Notification(name));
 }
 
 int Service::subscribeNotification(const std::string& name)
 {
-    auto closeHandler = [&, name](int fd, runtime::Mainloop::Event event) {
+    auto closeHandler = [&, name, this](int fd, runtime::Mainloop::Event event) {
         if ((event & EPOLLHUP) || (event & EPOLLRDHUP)) {
             unsubscribeNotification(name, fd);
             return;
@@ -146,6 +148,7 @@ int Service::subscribeNotification(const std::string& name)
         mainloop.addEventSource(slot.first, EPOLLHUP | EPOLLRDHUP, closeHandler);
         return slot.second;
     } catch (runtime::Exception& e) {
+        ERROR(e.what());
         return -1;
     }
 
@@ -164,34 +167,40 @@ int Service::unsubscribeNotification(const std::string& name, const int id)
     Notification& notification = notificationRegistry[name];
     notificationLock.unlock();
 
-    notification.removeSubscriber(id);
-
     mainloop.removeEventSource(id);
 
+    notification.removeSubscriber(id);
+
     return 0;
 }
 
 void Service::onMessageProcess(const std::shared_ptr<Connection>& connection)
 {
-    auto process = [&](Message& request) {
+    // The connection object can be destroyed in main-thread when peer is closed.
+    // To make sure that the connection object is valid on that situation,
+    // we should increase the reference count of the shared_ptr by capturing it as value
+    auto process = [&, connection](Message& request) {
         try {
-            //stateLock.lock();
             std::shared_ptr<MethodDispatcher> methodDispatcher = methodRegistry.at(request.target());
-            //stateLock.unlock();
 
             // [TBD] Request authentication before dispatching method handler.
             processingContext = ProcessingContext(connection);
             connection->send((*methodDispatcher)(request));
         } catch (std::exception& e) {
-            std::cerr << e.what() << std::endl;
-            connection->send(request.createErrorMessage());
+            try {
+                // Forward the exception to the peer
+                connection->send(request.createErrorMessage(e.what()));
+            } catch (std::exception& ex) {
+                // The connection is abnormally closed by the peer.
+                ERROR(ex.what());
+            }
         }
     };
 
     try {
         workqueue.submit(std::bind(process, connection->dispatch()));
-    } catch (runtime::Exception& e) {
-        std::cerr << e.what() << std::endl;
+    } catch (std::exception& e) {
+        ERROR(e.what());
     }
 }
 
index 8b1163a..b2d3bc5 100644 (file)
@@ -22,6 +22,7 @@
 #include <functional>
 #include <memory>
 #include <vector>
+#include <list>
 #include <unordered_map>
 #include <thread>
 
@@ -116,7 +117,7 @@ private:
         Credentials credentials;
     };
 
-    typedef std::vector<std::shared_ptr<Connection>> ConnectionRegistry;
+    typedef std::list<std::shared_ptr<Connection>> ConnectionRegistry;
     typedef std::function<void(const std::shared_ptr<Connection>& connection)> CallbackDispatcher;
 
     typedef std::function<Message(Message& message)> MethodDispatcher;
@@ -140,6 +141,7 @@ private:
     runtime::ThreadPool workqueue;
     std::mutex stateLock;
     std::mutex notificationLock;
+    std::mutex methodRegistryLock;
 
     static thread_local ProcessingContext processingContext;
 };
@@ -156,7 +158,7 @@ void Service::setMethodHandler(const std::string& method,
         return reply;
     };
 
-    std::lock_guard<std::mutex> lock(stateLock);
+    std::lock_guard<std::mutex> lock(methodRegistryLock);
 
     if (methodRegistry.count(method)) {
         throw runtime::Exception("Method handler already registered");
@@ -168,6 +170,8 @@ void Service::setMethodHandler(const std::string& method,
 template <typename... Args>
 void Service::notify(const std::string& name, Args&&... args)
 {
+    std::lock_guard<std::mutex> lock(notificationLock);
+
     Notification& slot = notificationRegistry[name];
     slot.notify(name, std::forward<Args>(args)...);
 }
index 43db759..8a61037 100644 (file)
@@ -32,6 +32,7 @@ DevicePolicyContext::DevicePolicyContext() noexcept
 
 DevicePolicyContext::~DevicePolicyContext() noexcept
 {
+    disconnect();
 }
 
 int DevicePolicyContext::connect(const std::string& address) noexcept
@@ -64,8 +65,13 @@ int DevicePolicyContext::subscribePolicyChange(const std::string& name,
         listener(policy.c_str(), state.c_str(), data);
     };
 
-    return client->subscribe<std::string, std::string>(SUBSCRIBER_REGISTER,
-                                                       name, listenerDispatcher);
+    try {
+        return client->subscribe<std::string, std::string>(SUBSCRIBER_REGISTER,
+                                                           name, listenerDispatcher);
+    } catch (runtime::Exception& e) {
+        std::cout << e.what() << std::endl;
+        return -1;
+    }
 }
 
 int DevicePolicyContext::unsubscribePolicyChange(int subscriberId)
@@ -81,8 +87,13 @@ int DevicePolicyContext::subscribeSignal(const std::string& name,
         listener(from.c_str(), object.c_str(), data);
     };
 
-    return client->subscribe<std::string, std::string, std::string>
-                            (SUBSCRIBER_REGISTER, name, listenerDispatcher);
+    try {
+        return client->subscribe<std::string, std::string, std::string>(SUBSCRIBER_REGISTER,
+                                                                        name, listenerDispatcher);
+    } catch (runtime::Exception& e) {
+        std::cout << e.what() << std::endl;
+        return -1;
+    }
 }
 
 int DevicePolicyContext::unsubscribeSignal(int subscriberId)
index f0ae728..3ae91d4 100644 (file)
@@ -172,7 +172,6 @@ int RestrictionPolicy::getLocationState()
        }
 }
 
-
 int RestrictionPolicy::setWifiState(bool enable)
 {
     try {
index be570e8..2dd7159 100644 (file)
@@ -17,6 +17,7 @@ SET(API_TEST_TARGET   "dpm-api-tests")
 
 SET(API_TEST_SOURCES main.c
                      testbench.c
+                                        context.c
                      bluetooth.c
 #                     password.c
                                         restriction.c
diff --git a/tests/api/context.c b/tests/api/context.c
new file mode 100644 (file)
index 0000000..8feb3af
--- /dev/null
@@ -0,0 +1,153 @@
+// Copyright (c) 2015 Samsung Electronics Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the License);
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include <stdio.h>
+#include <unistd.h>
+#include <pthread.h>
+
+#include <dpm/restriction.h>
+
+#include "testbench.h"
+
+#define MAX_WORKER_THREADS  8
+#define MAX_ITERATIONS      1000
+
+volatile int completed = 0;
+
+void device_policy_context_callback(const char* name, const char* state, void* user_data)
+{
+    int *triggered = user_data;
+    printf("*");
+    *triggered = 1;
+}
+
+void* getter(void* data)
+{
+    int i = 0;
+    dpm_context_h context;
+    volatile int triggered = 0;;
+
+    printf("Policy receiver %d is ready\n", *((int *)data));
+
+    while(1) {
+        context = dpm_context_create();
+        if (context == NULL) {
+            printf("Failed to create client context\n");
+            return (void *)TEST_FAILED;
+        }
+
+        int id;
+        dpm_context_add_policy_changed_cb(context, "camera", device_policy_context_callback, (void *)&triggered, &id);
+
+        while (!triggered) {
+            if (completed) {
+                dpm_context_remove_policy_changed_cb(context, id);
+                dpm_context_destroy(context);
+                return (void *)TEST_SUCCESSED;
+            }
+        }
+
+        triggered = 0;
+
+        if ((i % 10) == 0) {
+            printf("\n");
+        }
+
+        dpm_context_remove_policy_changed_cb(context, id);
+        dpm_context_destroy(context);
+
+        printf("G");
+
+        i++;
+    }
+
+    return (void *)TEST_SUCCESSED;
+}
+
+void* setter(void *data)
+{
+    int i;
+    dpm_context_h context;
+
+    printf("Thread setter %d is ready\n", *((int *)data));
+
+    for (i = 0; i < MAX_ITERATIONS; i++) {
+        context = dpm_context_create();
+        if (context == NULL) {
+            printf("Failed to create client context\n");
+            completed = 1;
+            return (void *)TEST_FAILED;
+        }
+
+        dpm_restriction_policy_h policy = dpm_context_acquire_restriction_policy(context);
+
+        int state = 0;
+
+        dpm_restriction_get_camera_state(policy, &state);
+        dpm_restriction_set_camera_state(policy, state ? 0 : 1);
+
+        dpm_context_release_restriction_policy(context, policy);
+
+        if ((i % 10) == 0) {
+            printf("\n");
+        }
+
+        printf("S");
+
+        dpm_context_destroy(context);
+
+    }
+    printf("\n");
+
+    completed = 1;
+
+    return (void *)TEST_SUCCESSED;
+}
+
+static int device_policy_context(struct testcase* tc)
+{
+    pthread_t handle[MAX_WORKER_THREADS];
+    int i, ret, status, idx[MAX_WORKER_THREADS];
+
+    for (i = 0; i < MAX_WORKER_THREADS; i++) {
+        idx[i] = i;
+
+        if (i == 0) {
+            pthread_create(&handle[i], NULL, setter, (void *)&idx[i]);
+        } else {
+            pthread_create(&handle[i], NULL, getter, (void *)&idx[i]);
+        }
+    }
+
+    ret = TEST_SUCCESSED;
+    for (i = 0; i < MAX_WORKER_THREADS; i++) {
+        pthread_join(handle[i], (void *)&status);
+        if (status == TEST_FAILED) {
+            ret = TEST_FAILED;
+        }
+    }
+
+    return ret;
+}
+
+struct testcase device_policy_context_testcase = {
+    .description = "device policy context",
+    .handler = device_policy_context
+};
+
+void TESTCASE_CONSTRUCTOR device_policy_context_build_testcase(void)
+{
+    testbench_populate_testcase(&device_policy_context_testcase);
+}