std::lock_guard<Mutex> lock(mutex);
if (callbacks.find(fd) != callbacks.end()) {
- throw Exception(GetSystemErrorMessage());
+ throw Exception("Event source already registered");
}
::memset(&event, 0, sizeof(epoll_event));
int Client::unsubscribe(const std::string& provider, int id)
{
+ // file descriptor(id) is automatically closed when mainloop callback is destroyed.
mainloop.removeEventSource(id);
return 0;
}
request.packParameters(name);
connection->send(request);
- Message reply = connection->dispatch();
- if (reply.isError()) {
- return -1;
- }
-
FileDescriptor response;
+ Message reply = connection->dispatch();
reply.disclose(response);
return response.fileDescriptor;
request.packParameters(std::forward<Args>(args)...);
connection->send(request);
- Message reply = connection->dispatch();
-
Type response;
+ Message reply = connection->dispatch();
reply.disclose<Type>(response);
return response;
Message Connection::dispatch() const
{
Message message;
-
std::lock_guard<std::mutex> lock(receiveMutex);
+
message.decode(socket);
+ if (message.isError()) {
+ std::string exception;
+ message.disclose(exception);
+
+ throw runtime::Exception(exception);
+ }
return message;
}
return Message(id(), Reply, target());
}
-Message Message::createErrorMessage() const
+Message Message::createErrorMessage(const std::string& message) const
{
- return Message(id(), Error, target());
+ Message error(id(), Error, target());
+ error.enclose(message);
+
+ return error;
}
template<> void Message::enclose(FileDescriptor&& fd)
// [TBD] Take arguments
Message createReplyMessage() const;
- Message createErrorMessage() const;
+ Message createErrorMessage(const std::string& message) const;
unsigned int id() const
{
}
std::lock_guard<std::mutex> lock(subscriberLock);
- subscribers.emplace_back(std::make_shared<Socket>(fds[0]));
+ subscribers.push_back(std::make_shared<Socket>(fds[0]));
return SubscriptionId(fds[0], fds[1]);
}
{
std::lock_guard<std::mutex> lock(subscriberLock);
- std::vector<std::shared_ptr<Socket>>::iterator it = subscribers.begin();
+ std::list<std::shared_ptr<Socket>>::iterator it = subscribers.begin();
while (it != subscribers.end()) {
if ((*it)->getFd() == id) {
subscribers.erase(it);
return 0;
}
+ ++it;
}
return -1;
#include <vector>
#include <mutex>
#include <unordered_map>
+#include <list>
#include <utility>
#include <memory>
private:
std::string signalName;
- std::vector<std::shared_ptr<Socket>> subscribers;
+ std::list<std::shared_ptr<Socket>> subscribers;
std::mutex subscriberLock;
};
std::lock_guard<std::mutex> lock(subscriberLock);
- for (std::shared_ptr<Socket>& subscriber : subscribers) {
+ for (const std::shared_ptr<Socket>& subscriber : subscribers) {
try {
msg.encode(*subscriber);
} catch (runtime::Exception& e) {
#include "exception.h"
#include "service.h"
#include "message.h"
+#include "audit/logger.h"
namespace rmi {
Service::ConnectionRegistry::iterator Service::getConnectionIterator(const int id)
{
- std::lock_guard<std::mutex> lock(stateLock);
-
return std::find_if(connectionRegistry.begin(), connectionRegistry.end(),
[id](const std::shared_ptr<Connection>& connection) {
return id == connection->getFd();
{
auto callback = [connectionCallback, this](const std::shared_ptr<Connection>& connection) {
auto handle = [&](int fd, runtime::Mainloop::Event event) {
+ std::lock_guard<std::mutex> lock(stateLock);
+
auto iter = getConnectionIterator(fd);
if (iter == connectionRegistry.end()) {
return;
mainloop.addEventSource(connection->getFd(),
EPOLLIN | EPOLLHUP | EPOLLRDHUP,
handle);
+ std::lock_guard<std::mutex> lock(stateLock);
connectionRegistry.push_back(connection);
}
};
throw runtime::Exception("Notification already registered");
}
- notificationRegistry.emplace(name, name);
+ notificationRegistry.emplace(name, Notification(name));
}
int Service::subscribeNotification(const std::string& name)
{
- auto closeHandler = [&, name](int fd, runtime::Mainloop::Event event) {
+ auto closeHandler = [&, name, this](int fd, runtime::Mainloop::Event event) {
if ((event & EPOLLHUP) || (event & EPOLLRDHUP)) {
unsubscribeNotification(name, fd);
return;
mainloop.addEventSource(slot.first, EPOLLHUP | EPOLLRDHUP, closeHandler);
return slot.second;
} catch (runtime::Exception& e) {
+ ERROR(e.what());
return -1;
}
Notification& notification = notificationRegistry[name];
notificationLock.unlock();
- notification.removeSubscriber(id);
-
mainloop.removeEventSource(id);
+ notification.removeSubscriber(id);
+
return 0;
}
void Service::onMessageProcess(const std::shared_ptr<Connection>& connection)
{
- auto process = [&](Message& request) {
+ // The connection object can be destroyed in main-thread when peer is closed.
+ // To make sure that the connection object is valid on that situation,
+ // we should increase the reference count of the shared_ptr by capturing it as value
+ auto process = [&, connection](Message& request) {
try {
- //stateLock.lock();
std::shared_ptr<MethodDispatcher> methodDispatcher = methodRegistry.at(request.target());
- //stateLock.unlock();
// [TBD] Request authentication before dispatching method handler.
processingContext = ProcessingContext(connection);
connection->send((*methodDispatcher)(request));
} catch (std::exception& e) {
- std::cerr << e.what() << std::endl;
- connection->send(request.createErrorMessage());
+ try {
+ // Forward the exception to the peer
+ connection->send(request.createErrorMessage(e.what()));
+ } catch (std::exception& ex) {
+ // The connection is abnormally closed by the peer.
+ ERROR(ex.what());
+ }
}
};
try {
workqueue.submit(std::bind(process, connection->dispatch()));
- } catch (runtime::Exception& e) {
- std::cerr << e.what() << std::endl;
+ } catch (std::exception& e) {
+ ERROR(e.what());
}
}
#include <functional>
#include <memory>
#include <vector>
+#include <list>
#include <unordered_map>
#include <thread>
Credentials credentials;
};
- typedef std::vector<std::shared_ptr<Connection>> ConnectionRegistry;
+ typedef std::list<std::shared_ptr<Connection>> ConnectionRegistry;
typedef std::function<void(const std::shared_ptr<Connection>& connection)> CallbackDispatcher;
typedef std::function<Message(Message& message)> MethodDispatcher;
runtime::ThreadPool workqueue;
std::mutex stateLock;
std::mutex notificationLock;
+ std::mutex methodRegistryLock;
static thread_local ProcessingContext processingContext;
};
return reply;
};
- std::lock_guard<std::mutex> lock(stateLock);
+ std::lock_guard<std::mutex> lock(methodRegistryLock);
if (methodRegistry.count(method)) {
throw runtime::Exception("Method handler already registered");
template <typename... Args>
void Service::notify(const std::string& name, Args&&... args)
{
+ std::lock_guard<std::mutex> lock(notificationLock);
+
Notification& slot = notificationRegistry[name];
slot.notify(name, std::forward<Args>(args)...);
}
DevicePolicyContext::~DevicePolicyContext() noexcept
{
+ disconnect();
}
int DevicePolicyContext::connect(const std::string& address) noexcept
listener(policy.c_str(), state.c_str(), data);
};
- return client->subscribe<std::string, std::string>(SUBSCRIBER_REGISTER,
- name, listenerDispatcher);
+ try {
+ return client->subscribe<std::string, std::string>(SUBSCRIBER_REGISTER,
+ name, listenerDispatcher);
+ } catch (runtime::Exception& e) {
+ std::cout << e.what() << std::endl;
+ return -1;
+ }
}
int DevicePolicyContext::unsubscribePolicyChange(int subscriberId)
listener(from.c_str(), object.c_str(), data);
};
- return client->subscribe<std::string, std::string, std::string>
- (SUBSCRIBER_REGISTER, name, listenerDispatcher);
+ try {
+ return client->subscribe<std::string, std::string, std::string>(SUBSCRIBER_REGISTER,
+ name, listenerDispatcher);
+ } catch (runtime::Exception& e) {
+ std::cout << e.what() << std::endl;
+ return -1;
+ }
}
int DevicePolicyContext::unsubscribeSignal(int subscriberId)
}
}
-
int RestrictionPolicy::setWifiState(bool enable)
{
try {
SET(API_TEST_SOURCES main.c
testbench.c
+ context.c
bluetooth.c
# password.c
restriction.c
--- /dev/null
+// Copyright (c) 2015 Samsung Electronics Co., Ltd.
+//
+// Licensed under the Apache License, Version 2.0 (the License);
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+
+#include <stdio.h>
+#include <unistd.h>
+#include <pthread.h>
+
+#include <dpm/restriction.h>
+
+#include "testbench.h"
+
+#define MAX_WORKER_THREADS 8
+#define MAX_ITERATIONS 1000
+
+volatile int completed = 0;
+
+void device_policy_context_callback(const char* name, const char* state, void* user_data)
+{
+ int *triggered = user_data;
+ printf("*");
+ *triggered = 1;
+}
+
+void* getter(void* data)
+{
+ int i = 0;
+ dpm_context_h context;
+ volatile int triggered = 0;;
+
+ printf("Policy receiver %d is ready\n", *((int *)data));
+
+ while(1) {
+ context = dpm_context_create();
+ if (context == NULL) {
+ printf("Failed to create client context\n");
+ return (void *)TEST_FAILED;
+ }
+
+ int id;
+ dpm_context_add_policy_changed_cb(context, "camera", device_policy_context_callback, (void *)&triggered, &id);
+
+ while (!triggered) {
+ if (completed) {
+ dpm_context_remove_policy_changed_cb(context, id);
+ dpm_context_destroy(context);
+ return (void *)TEST_SUCCESSED;
+ }
+ }
+
+ triggered = 0;
+
+ if ((i % 10) == 0) {
+ printf("\n");
+ }
+
+ dpm_context_remove_policy_changed_cb(context, id);
+ dpm_context_destroy(context);
+
+ printf("G");
+
+ i++;
+ }
+
+ return (void *)TEST_SUCCESSED;
+}
+
+void* setter(void *data)
+{
+ int i;
+ dpm_context_h context;
+
+ printf("Thread setter %d is ready\n", *((int *)data));
+
+ for (i = 0; i < MAX_ITERATIONS; i++) {
+ context = dpm_context_create();
+ if (context == NULL) {
+ printf("Failed to create client context\n");
+ completed = 1;
+ return (void *)TEST_FAILED;
+ }
+
+ dpm_restriction_policy_h policy = dpm_context_acquire_restriction_policy(context);
+
+ int state = 0;
+
+ dpm_restriction_get_camera_state(policy, &state);
+ dpm_restriction_set_camera_state(policy, state ? 0 : 1);
+
+ dpm_context_release_restriction_policy(context, policy);
+
+ if ((i % 10) == 0) {
+ printf("\n");
+ }
+
+ printf("S");
+
+ dpm_context_destroy(context);
+
+ }
+ printf("\n");
+
+ completed = 1;
+
+ return (void *)TEST_SUCCESSED;
+}
+
+static int device_policy_context(struct testcase* tc)
+{
+ pthread_t handle[MAX_WORKER_THREADS];
+ int i, ret, status, idx[MAX_WORKER_THREADS];
+
+ for (i = 0; i < MAX_WORKER_THREADS; i++) {
+ idx[i] = i;
+
+ if (i == 0) {
+ pthread_create(&handle[i], NULL, setter, (void *)&idx[i]);
+ } else {
+ pthread_create(&handle[i], NULL, getter, (void *)&idx[i]);
+ }
+ }
+
+ ret = TEST_SUCCESSED;
+ for (i = 0; i < MAX_WORKER_THREADS; i++) {
+ pthread_join(handle[i], (void *)&status);
+ if (status == TEST_FAILED) {
+ ret = TEST_FAILED;
+ }
+ }
+
+ return ret;
+}
+
+struct testcase device_policy_context_testcase = {
+ .description = "device policy context",
+ .handler = device_policy_context
+};
+
+void TESTCASE_CONSTRUCTOR device_policy_context_build_testcase(void)
+{
+ testbench_populate_testcase(&device_policy_context_testcase);
+}