3 * Copyright (c) 2012 Samsung Electronics Co., Ltd.
5 * Licensed under the Apache License, Version 2.0 (the License);
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
24 #include "utility/fw_assert.h"
25 #include "utility/sync_util.h"
26 #include "utility/fw_async_queue_internal.h"
27 #include "engine-controller/interface.h"
28 #include "engine-controller/interface_internal.h"
29 #include "engine-controller/internal.h"
30 #include "engine-controller/task_message.h"
31 #include "engine-controller/task_spec.h"
32 #include "engine-controller/task_pool.h"
33 #include "engine-controller/task_spec_internal.h"
34 #include "engine-controller/task_error_internal.h"
37 #define EXPORT_API __attribute__ ((visibility("default")))
40 #ifndef SYNC_AGENT_LOG
42 #define LOG_TAG "AF_EC"
45 GPrivate *ec_cancel_task = NULL;
47 typedef void *(*ec_engine_controller_logic_func_cb) (ec_engine_controller_t *);
49 typedef void *(*ec_thread_start_routine_cb) (void *);
51 static ec_engine_controller_t *global_engine_controller = NULL;
52 static ec_engine_controller_receiver_t *global_engine_controller_receiver = NULL;
54 typedef struct ec_sync_task_output_s ec_sync_task_output_t;
55 struct ec_sync_task_output_s {
56 pthread_mutex_t mutex;
57 pthread_cond_t output_set_done_cond;
58 sync_agent_ec_boolean output_set_done;
59 sync_agent_ec_task_error_e task_error;
60 sync_agent_ec_uint out_param_cnt;
61 sync_agent_ec_param_param_s **out_param_array;
64 static ec_sync_task_output_t *_alloc_and_init_sync_task_output(void)
68 ec_sync_task_output_t *output = (ec_sync_task_output_t *) calloc(1, sizeof(ec_sync_task_output_t));
73 output->output_set_done = false;
74 pthread_mutex_init(&(output->mutex), NULL);
75 pthread_cond_init(&(output->output_set_done_cond), NULL);
85 static void _destory_sync_task_output(ec_sync_task_output_t * output)
90 pthread_cond_destroy(&(output->output_set_done_cond));
91 pthread_mutex_destroy(&(output->mutex));
99 static void _static_sync_task_finish_callback(sync_agent_ec_task_error_e task_error, sync_agent_ec_uint out_param_cnt, sync_agent_ec_param_param_s ** out_param_array, sync_agent_ec_pointer usr_data)
103 retm_if(usr_data == NULL, "ec_sync_task_output_t is NULL !!");
105 ec_sync_task_output_t *output = (ec_sync_task_output_t *) usr_data;
107 pthread_mutex_lock(&(output->mutex));
109 output->task_error = task_error;
110 output->out_param_cnt = out_param_cnt;
111 output->out_param_array = out_param_array;
113 g_atomic_int_set(&(output->output_set_done), true);
114 _DEBUG_TRACE("before conditional signal\n");
115 /* wake up receiver */
116 pthread_cond_signal(&(output->output_set_done_cond));
117 pthread_mutex_unlock(&(output->mutex));
118 _DEBUG_TRACE("after conditional signal\n");
120 _DEBUG_TRACE("finished\n");
125 static void _send_parent_EC_TASK_DONE_msg_to_itseft(ec_task_t * parent_task)
129 _DEBUG_TRACE("called. task_name = %s\n", parent_task->task_spec->task_name);
130 ec_task_message_t *new_task_msg = ec_task_message_create(EC_TASK_DONE, parent_task, 0);
131 if (new_task_msg == NULL) {
132 _DEBUG_ERROR("error. out of memory. we can not create parent container done message.");
134 /* TODO : error handling */
136 ec_send_msg_to_engine_controller_with_compare_priority(new_task_msg, ec_compare_priority_of_task_message_append_way, NULL);
141 static void _send_new_runnerable_child_task_msgs_to_itself(GSList * runnable_child_task_list, ec_task_message_t * received_task_msg)
145 retm_if(runnable_child_task_list == NULL, "GSList is NULL !!");
146 retm_if(received_task_msg == NULL, "ec_task_message_t is NULL !!");
148 ec_task_message_t *new_task_msg = NULL;
149 ec_task_t *child_task = NULL;
151 for (iter = runnable_child_task_list; iter != NULL; iter = g_slist_next(iter)) {
152 child_task = (ec_task_t *) (iter->data);
153 new_task_msg = ec_task_message_create(EC_TASK_START, child_task, 0);
154 if (received_task_msg->task_state == EC_TASK_START) {
155 /* note that we have to prepend task message in front of other messages with same priority */
156 /* otherwise, child task in container task always has penalty then new task input */
157 ec_send_msg_to_engine_controller_with_compare_priority(new_task_msg, ec_compare_priority_of_task_message_append_way, NULL);
158 } else if (received_task_msg->task_state == EC_TASK_DONE) {
159 ec_send_msg_to_engine_controller_with_compare_priority(new_task_msg, ec_compare_priority_of_task_message_append_way, NULL);
161 _DEBUG_ERROR("task msg has invalid task state");
169 static void *_engine_controller_logic(ec_engine_controller_t * engine_controller)
173 retvm_if(engine_controller == NULL, NULL, "ec_engine_controller_t is NULL !!");
175 sync_agent_util_async_queue_s *queue = engine_controller->queue;
176 ec_thread_pool_t *pool = engine_controller->thread_pool;
177 ec_task_pool_t *task_pool = engine_controller->task_pool;
178 ec_task_t *task = NULL;
179 ec_task_t *parent_task = NULL;
180 sync_agent_ec_int child_task_index_in_parent_task = -1;
181 ec_task_message_t *received_task_msg = NULL;
182 GSList *runnable_child_task_list = NULL;
186 received_task_msg = sync_agent_receive_msg_async_queue(queue);
187 if (received_task_msg == NULL) {
188 _DEBUG_ERROR("null returned from receive_msg_fw_async_queue\n");
192 if (received_task_msg->task_state == EC_TASK_ROOT_START) {
193 task = ec_task_ref_task(received_task_msg->u.task);
194 ec_request_msg_t *request_msg = (ec_request_msg_t *) sync_agent_get_original_object(task->request_msg);
195 _DEBUG_TRACE("\n\n======= [CONTROLLER : EC_TASK_ROOT_START] task(msg id : %d, task name : %s, task spec id : %d) ========= \n\n", request_msg->msg_head.msg_id, task->task_spec->task_name, request_msg->task_spec_id);
197 assert(ec_task_is_root_task(task));
198 ec_task_pool_add_task(task_pool, request_msg->msg_head.msg_id, task);
200 ec_task_message_t *new_task_msg = ec_task_message_create(EC_TASK_START, task, 0);
201 if (new_task_msg == NULL) {
205 ec_send_msg_to_engine_controller_with_compare_priority(new_task_msg, ec_compare_priority_of_task_message_append_way, NULL);
207 ec_task_unref_task(task);
209 } else if (received_task_msg->task_state == EC_TASK_START) {
210 task = ec_task_ref_task(received_task_msg->u.task);
211 ec_request_msg_t *request_msg = (ec_request_msg_t *) sync_agent_get_original_object(task->request_msg);
212 _DEBUG_TRACE("\n\n======= [CONTROLLER : EC_TASK_START] =========\ntask(task name : %s) related to root_task(msg id : %d, task spec id : %d)\n\n", task->task_spec->task_name, request_msg->msg_head.msg_id, request_msg->task_spec_id);
214 /* queuing if task applied to queuing rule */
215 sync_agent_ec_boolean is_pending_task = ec_task_add_pending_task_list_of_first_progress_blocking_realized_entity(task);
216 if (is_pending_task) {
217 _DEBUG_TRACE("task name : %s pending by queuing rule\n", task->task_spec->task_name);
218 ec_task_unref_task(task);
222 if (ec_task_is_simple_task(task)) {
223 ec_thread_pool_push_task(pool, task);
226 if (ec_task_is_container_task(task)) {
227 /* update progress blocking realized entity when dynamic task case */
228 /* this can be done since other dynamic case conditions can be ignored now */
229 if (ec_task_spec_is_dynamic_container(task->task_spec)) {
230 ec_task_update_progress_blocking_realized_entity_and_pop_if_possible(task);
233 /* check runnable child task */
234 runnable_child_task_list = ec_collect_firstly_runnable_child_tasks(task);
235 if (runnable_child_task_list == NULL) {
236 _DEBUG_ERROR("no runnable child task exist.\n");
240 /* send task message to engine controller itself */
241 _send_new_runnerable_child_task_msgs_to_itself(runnable_child_task_list, received_task_msg);
244 ec_task_unref_task(task);
246 } else if (received_task_msg->task_state == EC_TASK_ROOT_DONE) {
247 task = ec_task_ref_task(received_task_msg->u.task);
248 ec_request_msg_t *request_msg = (ec_request_msg_t *) sync_agent_get_original_object(task->request_msg);
250 _DEBUG_TRACE("\n\n======= [CONTROLLER : EC_TASK_ROOT_DONE] task(result : %s), msg id : %d, task name : %s, task spec id : %d) ========= \n\n", ec_task_error_string(task->task_error), request_msg->msg_head.msg_id,
251 task->task_spec->task_name, request_msg->task_spec_id);
253 ec_task_pool_remove_task(task_pool, request_msg->msg_head.msg_id);
254 ec_task_call_task_finish_callback(task);
255 _DEBUG_TRACE("root task ec_task_call_task_finish_callback done\n");
256 _DEBUG_TRACE("root task done\n");
257 } else if (received_task_msg->task_state == EC_TASK_DONE) {
258 task = ec_task_ref_task(received_task_msg->u.task);
259 ec_request_msg_t *request_msg = (ec_request_msg_t *) sync_agent_get_original_object(task->request_msg);
262 _DEBUG_TRACE("\n\n======= [CONTROLLER : EC_TASK_DONE : %s] =========\ntask(task name : %s) related to root_task(msg id : %d, task spec id : %d)\n\n", ec_task_error_string(task->task_error), task->task_spec->task_name,
263 request_msg->msg_head.msg_id, request_msg->task_spec_id);
265 ec_task_update_progress_blocking_realized_entity_and_pop_if_possible(task);
267 if (ec_task_is_root_task(task)) {
269 ec_task_message_t *new_task_msg = ec_task_message_create(EC_TASK_ROOT_DONE, task, 0);
270 if (new_task_msg == NULL) {
274 ec_send_msg_to_engine_controller_with_compare_priority(new_task_msg, ec_compare_priority_of_task_message_append_way, NULL);
276 switch (task->task_error) {
277 case SYNC_AGENT_EC_TASK_ERROR_RUN_SUCCESS:
279 parent_task = ec_task_get_parent_task(task);
280 child_task_index_in_parent_task = ec_task_get_child_index_in_parent_task(task);
282 ec_task_decrease_left_child_to_run(parent_task);
284 if (ec_task_is_parent_EC_TASK_DONE(parent_task)) {
285 ec_task_set_task_error(parent_task, SYNC_AGENT_EC_TASK_ERROR_RUN_SUCCESS);
287 /* get parent task output from child tasks */
288 ec_task_collect_parent_task_output_parameter(parent_task);
290 /* send parent task done message to engine controller itself */
291 _send_parent_EC_TASK_DONE_msg_to_itseft(parent_task);
295 /* remove dependencies related to done operation */
296 /* if some operations has no dependency, then push in thread_pool */
297 runnable_child_task_list = ec_collect_become_runnable_child_tasks_by_remove_control_flow(parent_task, child_task_index_in_parent_task);
299 /* send task message to engine controller itself */
300 _send_new_runnerable_child_task_msgs_to_itself(runnable_child_task_list, received_task_msg);
304 case SYNC_AGENT_EC_TASK_ERROR_RUN_FAILED:
305 case SYNC_AGENT_EC_TASK_ERROR_CANCELED:
306 parent_task = ec_task_get_parent_task(task);
308 if (ec_task_is_not_yet_run(parent_task)) {
309 /* FIXME : do i have to wait other child task finished? */
310 ec_task_set_task_error(parent_task, task->task_error);
312 /* send parent task done message to engine controller itself */
313 _send_parent_EC_TASK_DONE_msg_to_itseft(parent_task);
316 default: /* TODO : other task error must be handled */
321 ec_task_unref_task(task);
323 } else if (received_task_msg->task_state == EC_TASK_CANCEL) {
324 task = ec_task_pool_fetch_task(task_pool, received_task_msg->u.request_msg_id);
326 ec_request_msg_t *request_msg = (ec_request_msg_t *) sync_agent_get_original_object(task->request_msg);
327 _DEBUG_TRACE("\n\n======= [CONTROLLER : EC_TASK_CANCEL] =========\ncancel task(request message id : %d, task name : %s) related to root_task(msg id : %d, task spec id : %d)\n\n", received_task_msg->u.request_msg_id,
328 task->task_spec->task_name, request_msg->msg_head.msg_id, request_msg->task_spec_id);
330 ec_task_do_cancellation(task);
331 ec_task_unref_task(task);
333 /* TODO : task request may be not handled yet. becuase cancel request has high priority */
334 _DEBUG_TRACE("\n\n======= [CONTROLLER : EC_TASK_CANCEL] =========\ncancel task(request message id : %d) received, " "but no such task exist. it could be already executed or you never requested\n\n",
335 received_task_msg->u.request_msg_id);
337 } else if (received_task_msg->task_state == EC_TASK_CANCEL_ALL) {
338 _DEBUG_TRACE("\n\n======= [CONTROLLER : EC_TASK_CANCEL_ALL] =========\n\n\n");
340 GList *all_tasks_list = ec_task_pool_fetch_all_tasks(task_pool);
342 ec_task_t *task = NULL;
343 sync_agent_ec_uint i = 0;
344 ec_request_msg_t *request_msg = NULL;
345 for (i = 0, iter = all_tasks_list; iter != NULL; i++, iter = g_list_next(iter)) {
347 request_msg = (ec_request_msg_t *) sync_agent_get_original_object(task->request_msg);
348 _DEBUG_TRACE("\n\n======= [CONTROLLER : EC_TASK_CANCEL_ALL_DETAIL (%d)] =========\ncancel task(request message id : %d, task name : %s) related to root_task(msg id : %d, task spec id : %d)\n\n", i,
349 received_task_msg->u.request_msg_id, task->task_spec->task_name, request_msg->msg_head.msg_id, request_msg->task_spec_id);
351 ec_task_do_cancellation(task);
352 ec_task_unref_task(task);
355 if (all_tasks_list != NULL)
356 g_list_free(all_tasks_list);
358 _DEBUG_ERROR("task msg has invalid task state\n");
363 ec_task_message_free(received_task_msg);
368 return NULL; /* TODO */
371 static void *_engine_controller_receiver_logic(ec_engine_controller_receiver_t * engine_controller_receiver)
375 retvm_if(engine_controller_receiver == NULL, NULL, "ec_engine_controller_receiver_t is NULL !!");
377 sync_agent_util_async_queue_s *queue = engine_controller_receiver->queue;
378 ec_task_info_pool_t *task_info_pool = engine_controller_receiver->task_info_pool;
379 ec_queuing_rule_spec_pool_t *queuing_rule_spec_pool = engine_controller_receiver->queuing_rule_spec_pool;
380 sync_agent_ec_error_e ec_error = SYNC_AGENT_EC_OK;
382 ec_task_message_t *task_msg = NULL;
383 SYNC_AGENT_UTIL_ASSERT_CONDITION(queue != NULL, "engine controller receiver has no queue\n");
386 void *msg = sync_agent_receive_msg_async_queue(queue);
387 if (ec_msg_is_register_msg(msg)) {
388 _DEBUG_TRACE("\n\n======= [RECEIVER : REGISTER TASK SPEC MSG] =========\n\n");
389 ec_register_msg_t *register_msg = (ec_register_msg_t *) msg;
391 /* create task info and add to task info pool */
392 ec_task_info_t *task_info = ec_task_info_new(register_msg->task_spec_id, register_msg->task_spec);
394 ec_task_info_pool_add_task_info(task_info_pool, task_info, false);
397 ec_msg_free_register_msg(register_msg);
398 } else if (ec_msg_is_unregister_msg(msg)) {
399 _DEBUG_TRACE("\n\n======= [RECEIVER : UNREGISTER TASK SPEC MSG] =========\n\n");
401 } else if (ec_msg_is_register_queuing_rule_spec_msg(msg)) {
402 _DEBUG_TRACE("\n\n======= [RECEIVER : REGISTER QUEUING RULE SPEC MSG] =========\n\n");
403 sync_agent_ec_uint queuing_rule_id = 0;
404 ec_register_queuing_rule_spec_msg_t *queuing_rule_register_msg = (ec_register_queuing_rule_spec_msg_t *) msg;
406 ec_error = ec_queuing_rule_spec_pool_add_queuing_rule_spec(queuing_rule_spec_pool, queuing_rule_register_msg->spec, &queuing_rule_id);
408 if (queuing_rule_register_msg->register_finish_callback != NULL) {
409 queuing_rule_register_msg->register_finish_callback(ec_error, queuing_rule_id, queuing_rule_register_msg->usr_data);
412 } else if (ec_msg_is_cancel_msg(msg)) {
413 _DEBUG_TRACE("\n\n======= [RECEIVER : CANCEL MSG] =========\n\n");
414 ec_cancel_msg_t *cancel_msg = ((ec_cancel_msg_t *) msg);
416 task_msg = ec_task_message_create(EC_TASK_CANCEL, NULL, cancel_msg->request_msg_id_to_cancel);
417 if (task_msg == NULL) {
418 _DEBUG_ERROR("out of memory during ec_task_message_create\n");
423 ec_send_msg_to_engine_controller_with_compare_priority(task_msg, ec_compare_priority_of_task_message_append_way, NULL);
425 } else if (ec_msg_is_cancel_all_msg(msg)) {
426 _DEBUG_TRACE("\n\n======= [RECEIVER : CANCEL ALL MSG] =========\n\n");
428 task_msg = ec_task_message_create(EC_TASK_CANCEL_ALL, NULL, 0);
429 if (task_msg == NULL) {
430 _DEBUG_ERROR("out of memory during ec_task_message_create\n");
435 ec_send_msg_to_engine_controller_with_compare_priority(task_msg, ec_compare_priority_of_task_message_append_way, NULL);
437 } else if (ec_msg_is_request_msg(msg)) {
439 _DEBUG_TRACE("\n\n======= [RECEIVER : REQUEST MSG] =========\n\n");
440 ec_request_msg_t *request_msg = ((ec_request_msg_t *) msg);
442 ec_task_info_t *task_info = ec_task_info_pool_search_task_info(task_info_pool, request_msg->task_spec_id);
443 sync_agent_ec_task_spec_s *task_spec = NULL;
444 ec_task_t *task = NULL;
446 if (task_info == NULL) {
447 _DEBUG_TRACE("no msg spec found\n");
450 task_spec = task_info->task_spec;
451 task = ec_task_alloc_root_task(task_spec, request_msg, task_info_pool);
453 _DEBUG_ERROR("out of memory during ec_task_alloc_root_task\n");
458 task_msg = ec_task_message_create(EC_TASK_ROOT_START, task, 0);
459 if (task_msg == NULL) {
460 _DEBUG_ERROR("out of memory during ec_task_message_create\n");
465 ec_send_msg_to_engine_controller_with_compare_priority(task_msg, ec_compare_priority_of_task_message_append_way, NULL);
466 ec_task_unref_task(task);
469 _DEBUG_TRACE("\n\n======= [RECEIVER : UNKNOWN MSG] =========\n\n");
470 /* unknown msg type */
471 _DEBUG_ERROR("unknown msg type\n");
477 return NULL; /* TODO */
480 ec_engine_controller_t *ec_alloc_engine_controller(sync_agent_ec_uint max_thread_count)
484 ec_engine_controller_t *engine_controller = (ec_engine_controller_t *) calloc(1, sizeof(ec_engine_controller_t));
485 if (engine_controller == NULL) {
489 engine_controller->queue = sync_agent_alloc_async_queue();
490 if (engine_controller->queue == NULL) {
494 engine_controller->task_pool = ec_task_pool_create_task_pool();
495 if (engine_controller->task_pool == NULL) {
499 engine_controller->queuing_rule_spec_pool = NULL;
501 engine_controller->thread_pool = ec_thread_pool_alloc_and_init(max_thread_count);
502 if (engine_controller->thread_pool == NULL) {
505 #if !GLIB_CHECK_VERSION (2, 32, 0)
506 ec_cancel_task = ec_thread_pool_get_cancel_task(engine_controller->thread_pool);
508 ec_cancel_task = ec_thread_pool_get_cancel_task();
510 if (ec_cancel_task == NULL) {
516 return engine_controller;
519 ec_free_engine_controller(engine_controller);
523 void ec_free_engine_controller(ec_engine_controller_t * engine_controller)
527 retm_if(engine_controller == NULL, "ec_engine_controller_t is NULL !!");
529 if (engine_controller != NULL) {
530 if (engine_controller->thread_pool) {
531 ec_thread_pool_destroy(engine_controller->thread_pool);
532 engine_controller->thread_pool = NULL;
534 if (engine_controller->queuing_rule_spec_pool) {
535 ec_queuing_rule_spec_pool_free(engine_controller->queuing_rule_spec_pool);
536 engine_controller->queuing_rule_spec_pool = NULL;
538 if (engine_controller->task_pool) {
539 ec_task_pool_free_task_pool(engine_controller->task_pool);
540 engine_controller->task_pool = NULL;
542 if (engine_controller->queue) {
543 util_destroy_async_queue(engine_controller->queue);
544 engine_controller->queue = NULL;
547 free(engine_controller);
553 /* TODO error handling */
554 void ec_run_engine_controller(ec_engine_controller_t * engine_controller)
558 retm_if(engine_controller == NULL, "ec_engine_controller_t is NULL !!");
560 pthread_create(&(engine_controller->thread_id), NULL, (ec_thread_start_routine_cb) _engine_controller_logic, (void *)engine_controller);
561 pthread_detach(engine_controller->thread_id);
566 void ec_stop_engine_controller(ec_engine_controller_t * engine_controller)
570 retm_if(engine_controller == NULL, "ec_engine_controller_t is NULL !!");
572 pthread_cancel(engine_controller->thread_id);
577 ec_engine_controller_receiver_t *ec_alloc_engine_controller_receiver(ec_engine_controller_t * engine_controller)
581 retvm_if(engine_controller == NULL, NULL, "ec_engine_controller_t is NULL !!");
583 ec_engine_controller_receiver_t *receiver = NULL;
585 receiver = (ec_engine_controller_receiver_t *) calloc(1, sizeof(ec_engine_controller_receiver_t));
586 if (receiver == NULL) {
590 receiver->next_msg_id = 0;
592 receiver->queue = sync_agent_alloc_async_queue();
593 if (receiver->queue == NULL) {
597 receiver->task_info_pool = ec_task_info_pool_alloc();
598 if (receiver->task_info_pool == NULL) {
602 /* TODO : remove below 1024 hard coding */
603 receiver->queuing_rule_spec_pool = ec_queuing_rule_spec_pool_alloc(1024, receiver->task_info_pool);
604 if (receiver->queuing_rule_spec_pool == NULL) {
608 receiver->engine_controller = engine_controller;
611 engine_controller->queuing_rule_spec_pool = receiver->queuing_rule_spec_pool;
618 ec_free_engine_controller_receiver(receiver);
622 void ec_free_engine_controller_receiver(ec_engine_controller_receiver_t * engine_controller_receiver)
626 retm_if(engine_controller_receiver == NULL, "ec_engine_controller_receiver_t is NULL !!");
628 if (engine_controller_receiver != NULL) {
629 if (engine_controller_receiver->queuing_rule_spec_pool) {
630 ec_queuing_rule_spec_pool_free(engine_controller_receiver->queuing_rule_spec_pool);
631 engine_controller_receiver->queuing_rule_spec_pool = NULL;
633 if (engine_controller_receiver->task_info_pool) {
634 ec_task_info_pool_free(engine_controller_receiver->task_info_pool);
635 engine_controller_receiver->task_info_pool = NULL;
637 if (engine_controller_receiver->queue) {
638 util_destroy_async_queue(engine_controller_receiver->queue);
639 engine_controller_receiver->queue = NULL;
642 free(engine_controller_receiver);
648 /* TODO error handling */
649 void ec_run_engine_controller_receiver(ec_engine_controller_receiver_t * engine_controller_receiver)
653 retm_if(engine_controller_receiver == NULL, "ec_engine_controller_receiver_t is NULL !!");
656 pthread_create(&(engine_controller_receiver->thread_id), NULL, (ec_thread_start_routine_cb) _engine_controller_receiver_logic, (void *)engine_controller_receiver);
657 pthread_detach(engine_controller_receiver->thread_id);
662 void ec_stop_engine_controller_receiver(ec_engine_controller_receiver_t * engine_controller_receiver)
666 retm_if(engine_controller_receiver == NULL, "ec_engine_controller_receiver_t is NULL !!");
668 pthread_cancel(engine_controller_receiver->thread_id);
673 void ec_send_msg_to_engine_controller_receiver(ec_msg_head_t * msg)
677 retm_if(msg == NULL, "ec_msg_head_t is NULL !!");
679 /* msg id management */
681 // g_atomic_int_exchange_and_add() to be deprecated
682 // msg->msg_id = g_atomic_int_exchange_and_add(&(global_engine_controller_receiver->next_msg_id), 1);
683 msg->msg_id = global_engine_controller_receiver->next_msg_id;
684 g_atomic_int_add(&(global_engine_controller_receiver->next_msg_id), 1);
686 _DEBUG_INFO("============= msg id = %d =============\n", msg->msg_id);
688 sync_agent_send_msg_async_queue(global_engine_controller_receiver->queue, (void *)msg);
693 void ec_send_msg_to_engine_controller(ec_task_message_t * task_msg)
697 retm_if(task_msg == NULL, "ec_task_message_t is NULL !!");
699 sync_agent_send_msg_async_queue(global_engine_controller->queue, (void *)task_msg);
704 void ec_send_msg_to_engine_controller_with_compare_priority(ec_task_message_t * task_msg, ec_compare_task_msg_priority_func_cb ctmp_func, sync_agent_ec_pointer user_data)
708 retm_if(task_msg == NULL, "ec_task_message_t is NULL !!");
710 util_send_msg_async_queue_with_compare_priority(global_engine_controller->queue, (void *)task_msg, (sync_agent_compare_priority_cb) ctmp_func, (void *)user_data);
715 /* external interfaces */
716 bool ec_init_engine_controller(unsigned int max_thread_count)
721 ec_engine_controller_t *engine_controller = NULL;
722 ec_engine_controller_receiver_t *engine_controller_receiver = NULL;
724 /* alloc engine controller */
725 engine_controller = ec_alloc_engine_controller(max_thread_count);
726 if (engine_controller == NULL) {
731 /* alloc engine controller receiver */
732 engine_controller_receiver = ec_alloc_engine_controller_receiver(engine_controller);
733 if (engine_controller_receiver == NULL) {
738 /* run engine controller */
739 ec_run_engine_controller(engine_controller);
740 ec_run_engine_controller_receiver(engine_controller_receiver);
744 global_engine_controller = engine_controller;
745 global_engine_controller_receiver = engine_controller_receiver;
747 /* TODO : stop engine controller & receiver & free */
755 void ec_deinit_engine_controller()
759 ec_stop_engine_controller_receiver(global_engine_controller_receiver);
760 ec_stop_engine_controller(global_engine_controller);
762 ec_free_engine_controller_receiver(global_engine_controller_receiver);
763 global_engine_controller_receiver = NULL;
764 ec_free_engine_controller(global_engine_controller);
765 global_engine_controller = NULL;
770 /* TODO : error handling */
771 EXPORT_API void sync_agent_register_task_spec(sync_agent_ec_uint task_spec_id, sync_agent_ec_char * task_spec_name, sync_agent_ec_task_spec_s * task_spec, sync_agent_calculate_identifier_cb cal_func)
775 retm_if(task_spec == NULL, "sync_agent_ec_task_spec_s is NULL !!");
777 ec_register_msg_t *register_msg = ec_msg_create_register_msg(task_spec_id, task_spec, cal_func);
778 ec_send_msg_to_engine_controller_receiver((ec_msg_head_t *) register_msg);
783 EXPORT_API void sync_agent_register_async_queuing_rule_spec(sync_agent_ec_queuing_rule_spec_s * spec, sync_agent_register_finish_cb register_finish_callback, sync_agent_ec_pointer usr_data)
787 ec_register_queuing_rule_spec_msg_t *msg = ec_msg_create_register_queuing_rule_spec_msg(spec, register_finish_callback, usr_data);
789 ec_send_msg_to_engine_controller_receiver((ec_msg_head_t *) msg);
794 void ec_register_sync_queuing_rule_spec(sync_agent_ec_queuing_rule_spec_s * spec, sync_agent_ec_error_e * ec_error, sync_agent_ec_uint * registered_id)
803 /* TODO : error handling */
804 EXPORT_API void sync_agent_request_async_task(sync_agent_ec_uint task_spec_id, sync_agent_ec_uint identifier, sync_agent_ec_int cnt_in_param, sync_agent_ec_int * in_param_index_array, sync_agent_ec_value_type_e * in_param_value_type_array,
805 sync_agent_ec_pointer * in_param_value_array, sync_agent_task_finish_cb task_finish_callback, sync_agent_ec_pointer simple_task_finish_callback_usr_data, sync_agent_ec_int * request_id)
809 ec_request_msg_t *msg = ec_msg_create_request_msg(task_spec_id, identifier, cnt_in_param,
810 in_param_index_array, in_param_value_type_array, in_param_value_array,
811 task_finish_callback, simple_task_finish_callback_usr_data);
813 retm_if(msg == NULL, "ec_request_msg_t is NULL !!");
815 ec_send_msg_to_engine_controller_receiver((ec_msg_head_t *) msg);
816 *request_id = msg->msg_head.msg_id;
821 EXPORT_API void sync_agent_request_sync_task(sync_agent_ec_uint task_spec_id, sync_agent_ec_uint identifier, sync_agent_ec_int cnt_in_param, sync_agent_ec_int * in_param_index_array, sync_agent_ec_value_type_e * in_param_value_type_array,
822 sync_agent_ec_pointer * in_param_value_array, sync_agent_ec_int * request_id, sync_agent_ec_task_error_e * task_error, sync_agent_ec_uint * out_param_cnt, sync_agent_ec_param_param_s *** out_param_array)
826 sync_agent_task_finish_cb sync_task_finish_callback = _static_sync_task_finish_callback;
827 ec_sync_task_output_t *sync_task_output = _alloc_and_init_sync_task_output();
828 if (sync_task_output == NULL) {
832 ec_request_msg_t *msg = ec_msg_create_request_msg(task_spec_id, identifier, cnt_in_param,
833 in_param_index_array, in_param_value_type_array, in_param_value_array,
834 sync_task_finish_callback, sync_task_output);
836 retm_if(msg == NULL, "ec_request_msg_t is NULL !!");
838 pthread_mutex_lock(&(sync_task_output->mutex));
839 ec_send_msg_to_engine_controller_receiver((ec_msg_head_t *) msg);
841 while (!g_atomic_int_get(&(sync_task_output->output_set_done))) {
842 pthread_cond_wait(&(sync_task_output->output_set_done_cond), &(sync_task_output->mutex));
845 pthread_mutex_unlock(&(sync_task_output->mutex));
847 *request_id = msg->msg_head.msg_id;
848 *task_error = sync_task_output->task_error;
849 *out_param_cnt = sync_task_output->out_param_cnt;
850 *out_param_array = sync_task_output->out_param_array;
852 _destory_sync_task_output(sync_task_output);
857 EXPORT_API void sync_agent_cancel_task(sync_agent_ec_int request_id_to_cancel)
861 ec_cancel_msg_t *cancel_msg = ec_msg_create_cancel_msg(request_id_to_cancel);
862 ec_send_msg_to_engine_controller_receiver((ec_msg_head_t *) cancel_msg);
867 EXPORT_API void sync_agent_cancel_all_tasks()
871 ec_cancel_all_msg_t *cancel_all_msg = ec_msg_create_cancel_all_msg();
872 ec_send_msg_to_engine_controller_receiver((ec_msg_head_t *) cancel_all_msg);