// announce to all thread for clients
// signo 10 : ip changed
if (g_dp_client_slots != NULL) {
- int i = 0;
- for (; i < DP_MAX_CLIENTS; i++) {
+ for (int i = 0; i < DP_MAX_CLIENTS; i++) {
if (g_dp_client_slots[i].thread > 0 &&
pthread_kill(g_dp_client_slots[i].thread, 0) != ESRCH)
pthread_kill(g_dp_client_slots[i].thread, SIGUSR1);
return DP_ERROR_TOO_MANY_DOWNLOADS;
}
-void *dp_client_manager(void *arg)
+static void __dp_rebuild_dir()
{
- fd_set rset, eset, listen_fdset, except_fdset;
- struct timeval timeout; // for timeout of select
- socklen_t clientlen;
- struct sockaddr_un clientaddr;
- dp_credential credential;
- unsigned i;
- int errorcode = DP_ERROR_NONE;
- GMainLoop *event_loop = (GMainLoop *)arg;
-
- g_dp_sock = __dp_accept_socket_new();
- if (g_dp_sock < 0) {
- TRACE_ERROR("failed to open listen socket");
- g_main_loop_quit(event_loop);
- return 0;
- }
-
- if (signal(SIGTERM, dp_terminate) == SIG_ERR ||
- signal(SIGPIPE, SIG_IGN) == SIG_ERR ||
- signal(SIGINT, dp_terminate) == SIG_ERR) {
- TRACE_ERROR("failed to register signal callback");
- g_main_loop_quit(event_loop);
- return 0;
- }
-
- dp_notification_clear_ongoings();
-
#ifdef PROVIDER_DIR
dp_rebuild_dir(PROVIDER_DIR, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH);
#endif
#ifdef NOTIFY_DIR
dp_rebuild_dir(NOTIFY_DIR, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH);
#endif
+}
+static bool __dp_client_add_signal_handler()
+{
+ if (signal(SIGTERM, dp_terminate) == SIG_ERR ||
+ signal(SIGPIPE, SIG_IGN) == SIG_ERR ||
+ signal(SIGINT, dp_terminate) == SIG_ERR) {
+ TRACE_ERROR("failed to register signal callback");
+ return false;
+ }
+ return true;
+}
+
+static bool __dp_client_create_slots()
+{
dp_client_slots_fmt *clients =
(dp_client_slots_fmt *)calloc(DP_MAX_CLIENTS,
sizeof(dp_client_slots_fmt));
if (clients == NULL) {
TRACE_ERROR("failed to allocate client slots");
- g_main_loop_quit(event_loop);
- return 0;
+ return false;
}
g_dp_client_slots = clients;
- for (i = 0; i < DP_MAX_CLIENTS; i++)
+ for (int i = 0; i < DP_MAX_CLIENTS; i++)
dp_mutex_destroy(&clients[i].mutex); // clear mutex init
+ return true;
+}
- int maxfd = g_dp_sock;
+static int __dp_client_get_credential(int clientfd, dp_credential *credential)
+{
+#ifdef SO_PEERCRED // getting the info of client
+ socklen_t cr_len = sizeof(*credential);
+ if (getsockopt(clientfd, SOL_SOCKET, SO_PEERCRED,
+ credential, &cr_len) < 0) {
+ TRACE_ERROR("failed to cred from sock:%d", clientfd);
+ return DP_ERROR_PERMISSION_DENIED;
+ }
+#else // In case of not supported SO_PEERCRED
+ if (read(clientfd, credential, sizeof(dp_credential)) <= 0) {
+ TRACE_ERROR("failed to cred from client:%d", clientfd);
+ return DP_ERROR_PERMISSION_DENIED;
+ }
+#endif
+ return DP_ERROR_NONE;
+}
+
+static void __dp_client_handle_event()
+{
+ int errorcode = DP_ERROR_NONE;
+ struct sockaddr_un clientaddr;
+
+ // Anyway accept client.
+ socklen_t clientlen = sizeof(clientaddr);
+ int clientfd = accept(g_dp_sock, (struct sockaddr *)&clientaddr, &clientlen);
+ if (clientfd < 0) {
+ TRACE_ERROR("too many client ? accept failure");
+ // provider need the time of refresh.
+ errorcode = DP_ERROR_DISK_BUSY;
+ }
+
+ // blocking & timeout to prevent the lockup by client.
+ struct timeval tv_timeo = {5, 500000}; // 5.5 sec
+ if (setsockopt(clientfd, SOL_SOCKET, SO_RCVTIMEO, &tv_timeo,
+ sizeof(tv_timeo)) < 0) {
+ TRACE_ERROR("failed to set timeout in blocking socket");
+ errorcode = DP_ERROR_IO_ERROR;
+ }
+
+ dp_ipc_fmt ipc_info;
+ memset(&ipc_info, 0x00, sizeof(dp_ipc_fmt));
+ if (read(clientfd, &ipc_info, sizeof(dp_ipc_fmt)) <= 0 ||
+ ipc_info.section == DP_SEC_NONE ||
+ ipc_info.property != DP_PROP_NONE ||
+ ipc_info.id != -1 ||
+ ipc_info.size != 0) {
+ TRACE_ERROR("peer terminate ? ignore this connection");
+ errorcode = DP_ERROR_INVALID_PARAMETER;
+ }
+
+ dp_credential credential = {-1, -1, -1};
+ int ret = __dp_client_get_credential(clientfd, &credential);
+ if (ret != DP_ERROR_NONE)
+ errorcode = ret;
+
+ if (errorcode == DP_ERROR_NONE)
+ errorcode = __dp_db_open_client_manager();
+
+ if (errorcode == DP_ERROR_NONE) {
+ if (ipc_info.section == DP_SEC_INIT)
+ errorcode = __dp_client_new(clientfd, g_dp_client_slots, credential);
+ else
+ errorcode = DP_ERROR_INVALID_PARAMETER;
+ }
+
+ if (dp_ipc_query(clientfd, -1, ipc_info.section, DP_PROP_NONE, errorcode, 0) < 0)
+ TRACE_ERROR("check ipc sock:%d", clientfd);
+
+ if (errorcode != DP_ERROR_NONE) {
+ TRACE_ERROR("sock:%d id:%d section:%s property:%s errorcode:%s size:%zd",
+ clientfd, ipc_info.id,
+ dp_print_section(ipc_info.section),
+ dp_print_property(ipc_info.property),
+ dp_print_errorcode(ipc_info.errorcode),
+ ipc_info.size);
+ close(clientfd); // ban this client
+ }
+ if (errorcode == DP_ERROR_NO_SPACE || errorcode == DP_ERROR_DISK_BUSY) {
+ TRACE_ERROR("provider can't work anymore errorcode:%s", dp_print_errorcode(errorcode));
+ //break; // provider will be terminated after sending errorcode by each thread
+ }
+}
+
+// take care zombie client, slots
+static bool __dp_client_handle_zombie_client()
+{
+ unsigned connected_clients = 0;
+ for (int i = 0; i < DP_MAX_CLIENTS; i++) {
+
+ int locked = CLIENT_MUTEX_TRYLOCK(&g_dp_client_slots[i].mutex);
+ if (locked == EINVAL) { // not initialized
+ continue;
+ } else if (locked == EBUSY) { // already locked
+ connected_clients++;
+ continue;
+ }
+
+ if (locked == 0) { // locked
+ // if no client thread, requests should be checked here
+ // if no queued, connecting or downloading, close the slot
+ if (g_dp_client_slots[i].pkgname != NULL) {
+ if (g_dp_client_slots[i].thread == 0) {
+ dp_client_clear_requests(&g_dp_client_slots[i]);
+ if (g_dp_client_slots[i].client.requests == NULL) {
+ dp_client_slot_free(&g_dp_client_slots[i]);
+ CLIENT_MUTEX_UNLOCK(&g_dp_client_slots[i].mutex);
+ dp_mutex_destroy(&g_dp_client_slots[i].mutex);
+ continue;
+ }
+ }
+ connected_clients++;
+ }
+ CLIENT_MUTEX_UNLOCK(&g_dp_client_slots[i].mutex);
+ }
+ }
+ TRACE_DEBUG("%d clients are active now", connected_clients);
+ // terminating download-provider if no clients.
+ if (connected_clients == 0) {
+ if (__dp_manage_client_requests(g_dp_client_slots) <= 0) // if no crashed job
+ return false;
+ } else {
+ dp_queue_manager_wake_up();
+ dp_notification_manager_wake_up();
+ }
+ return true;
+}
+
+void *dp_client_manager(void *arg)
+{
+ fd_set rset, eset, listen_fdset, except_fdset;
+ struct timeval timeout; // for timeout of select
+ GMainLoop *event_loop = (GMainLoop *)arg;
+
+ if (__dp_client_add_signal_handler() == false)
+ goto ERR;
+
+ dp_notification_clear_ongoings();
+ __dp_rebuild_dir();
+
+ if (__dp_client_create_slots() == false)
+ goto ERR;
+
+ g_dp_sock = __dp_accept_socket_new();
+ if (g_dp_sock < 0) {
+ TRACE_ERROR("failed to open listen socket");
+ goto ERR;
+ }
FD_ZERO(&listen_fdset);
FD_ZERO(&except_fdset);
FD_SET(g_dp_sock, &listen_fdset);
FD_SET(g_dp_sock, &except_fdset);
+ int maxfd = g_dp_sock;
while (g_dp_sock >= 0) {
-
- int clientfd = -1;
-
// initialize timeout structure for calling timeout exactly
memset(&timeout, 0x00, sizeof(struct timeval));
timeout.tv_sec = DP_CARE_CLIENT_MANAGER_INTERVAL;
- credential.pid = -1;
- credential.uid = -1;
- credential.gid = -1;
rset = listen_fdset;
eset = except_fdset;
-
if (select((maxfd + 1), &rset, 0, &eset, &timeout) < 0) {
TRACE_ERROR("interrupted by terminating");
break;
if (FD_ISSET(g_dp_sock, &eset) > 0) {
TRACE_ERROR("exception of socket");
break;
- } else if (FD_ISSET(g_dp_sock, &rset) > 0) {
-
- // Anyway accept client.
- clientlen = sizeof(clientaddr);
- clientfd = accept(g_dp_sock, (struct sockaddr *)&clientaddr,
- &clientlen);
- if (clientfd < 0) {
- TRACE_ERROR("too many client ? accept failure");
- // provider need the time of refresh.
- errorcode = DP_ERROR_DISK_BUSY;
- }
-
- // blocking & timeout to prevent the lockup by client.
- struct timeval tv_timeo = {5, 500000}; // 5.5 sec
- if (setsockopt(clientfd, SOL_SOCKET, SO_RCVTIMEO, &tv_timeo,
- sizeof(tv_timeo)) < 0) {
- TRACE_ERROR("failed to set timeout in blocking socket");
- errorcode = DP_ERROR_IO_ERROR;
- }
-
- dp_ipc_fmt ipc_info;
- memset(&ipc_info, 0x00, sizeof(dp_ipc_fmt));
- if (read(clientfd, &ipc_info, sizeof(dp_ipc_fmt)) <= 0 ||
- ipc_info.section == DP_SEC_NONE ||
- ipc_info.property != DP_PROP_NONE ||
- ipc_info.id != -1 ||
- ipc_info.size != 0) {
- TRACE_ERROR("peer terminate ? ignore this connection");
- errorcode = DP_ERROR_INVALID_PARAMETER;
- }
-
-#ifdef SO_PEERCRED // getting the info of client
- socklen_t cr_len = sizeof(credential);
- if (getsockopt(clientfd, SOL_SOCKET, SO_PEERCRED,
- &credential, &cr_len) < 0) {
- TRACE_ERROR("failed to cred from sock:%d", clientfd);
- errorcode = DP_ERROR_PERMISSION_DENIED;
- }
-#else // In case of not supported SO_PEERCRED
- if (read(clientfd, &credential, sizeof(dp_credential)) <= 0) {
- TRACE_ERROR("failed to cred from client:%d", clientfd);
- errorcode = DP_ERROR_PERMISSION_DENIED;
- }
-#endif
-
- if (errorcode == DP_ERROR_NONE)
- errorcode = __dp_db_open_client_manager();
-
- if (errorcode == DP_ERROR_NONE) {
- if (ipc_info.section == DP_SEC_INIT) {
-
- // new client
- errorcode = __dp_client_new(clientfd, clients, credential);
-
- } else {
- errorcode = DP_ERROR_INVALID_PARAMETER;
- }
- }
- if (dp_ipc_query(clientfd, -1, ipc_info.section, DP_PROP_NONE, errorcode, 0) < 0)
- TRACE_ERROR("check ipc sock:%d", clientfd);
-
- if (errorcode != DP_ERROR_NONE) {
- TRACE_ERROR("sock:%d id:%d section:%s property:%s errorcode:%s size:%zd",
- clientfd, ipc_info.id,
- dp_print_section(ipc_info.section),
- dp_print_property(ipc_info.property),
- dp_print_errorcode(ipc_info.errorcode),
- ipc_info.size);
- close(clientfd); // ban this client
- }
- if (errorcode == DP_ERROR_NO_SPACE || errorcode == DP_ERROR_DISK_BUSY) {
- TRACE_ERROR("provider can't work anymore errorcode:%s", dp_print_errorcode(errorcode));
- //break; // provider will be terminated after sending errorcode by each thread
- }
-
- } else {
-
- // take care zombie client, slots
- unsigned connected_clients = 0;
- int i = 0;
- for (; i < DP_MAX_CLIENTS; i++) {
-
- int locked = CLIENT_MUTEX_TRYLOCK(&clients[i].mutex);
- if (locked == EINVAL) { // not initialized
- continue;
- } else if (locked == EBUSY) { // already locked
- connected_clients++;
- continue;
- }
-
- if (locked == 0) { // locked
-
- // if no client thread, requests should be checked here
- // if no queued, connecting or downloading, close the slot
- if (clients[i].pkgname != NULL) {
- if (clients[i].thread == 0) {
- dp_client_clear_requests(&clients[i]);
- if (clients[i].client.requests == NULL) {
- dp_client_slot_free(&clients[i]);
- CLIENT_MUTEX_UNLOCK(&clients[i].mutex);
- dp_mutex_destroy(&clients[i].mutex);
- continue;
- }
- }
- connected_clients++;
- }
- CLIENT_MUTEX_UNLOCK(&clients[i].mutex);
- }
- }
- TRACE_DEBUG("%d clients are active now", connected_clients);
- // terminating download-provider if no clients.
- if (connected_clients == 0) {
- if (__dp_manage_client_requests(clients) <= 0) // if no crashed job
- break;
- } else {
- dp_queue_manager_wake_up();
- dp_notification_manager_wake_up();
- }
}
+ if (FD_ISSET(g_dp_sock, &rset) > 0)
+ __dp_client_handle_event();
+ else if (__dp_client_handle_zombie_client() == false)
+ break;
+
}
if (g_dp_sock >= 0)
close(g_dp_sock);
// kill other clients
TRACE_DEBUG("try to deallocate the resources for all clients");
- for (i = 0; i < DP_MAX_CLIENTS; i++) {
- int locked = CLIENT_MUTEX_TRYLOCK(&clients[i].mutex);
+ for (int i = 0; i < DP_MAX_CLIENTS; i++) {
+ int locked = CLIENT_MUTEX_TRYLOCK(&g_dp_client_slots[i].mutex);
if (locked == EBUSY) { // already locked
- CLIENT_MUTEX_LOCK(&clients[i].mutex);
+ CLIENT_MUTEX_LOCK(&g_dp_client_slots[i].mutex);
} else if (locked == EINVAL) { // not initialized, empty slot
continue;
}
- dp_client_slot_free(&clients[i]);
- CLIENT_MUTEX_UNLOCK(&clients[i].mutex);
- dp_mutex_destroy(&clients[i].mutex);
+ dp_client_slot_free(&g_dp_client_slots[i]);
+ CLIENT_MUTEX_UNLOCK(&g_dp_client_slots[i].mutex);
+ dp_mutex_destroy(&g_dp_client_slots[i].mutex);
}
- free(clients);
+ free(g_dp_client_slots);
// free all resources
TRACE_INFO("client-manager's working is done");
+
+ERR:
g_main_loop_quit(event_loop);
return 0;
}