Modified to read conditions aynchronously 89/56589/1
authorSomin Kim <somin926.kim@samsung.com>
Mon, 11 Jan 2016 09:32:03 +0000 (18:32 +0900)
committerSomin Kim <somin926.kim@samsung.com>
Mon, 11 Jan 2016 09:32:03 +0000 (18:32 +0900)
Change-Id: Ib85e46bb5184d91d52a08daafc36c5a3b24189a2
Signed-off-by: Somin Kim <somin926.kim@samsung.com>
src/context_trigger/context_monitor.cpp
src/context_trigger/context_monitor.h
src/context_trigger/fact_request.cpp
src/context_trigger/rule.cpp
src/context_trigger/rule.h

index b86d3dc805b785345dd9bbdfb3d97b66d0bd495b..dcd1369401912b20d4b26fbd428d95ea2d4a36bb 100644 (file)
@@ -25,7 +25,6 @@
 
 static int last_rid;
 static int last_err;
-static ctx::json last_data_read;
 
 ctx::context_manager_impl *ctx::context_monitor::_context_mgr = NULL;
 
@@ -68,20 +67,20 @@ int ctx::context_monitor::subscribe(int rule_id, std::string subject, ctx::json
                return ERR_NONE;
        }
 
-       int req_id = _subscribe(subject.c_str(), &option, listener, true);
+       int req_id = _subscribe(subject.c_str(), &option, listener);
        IF_FAIL_RETURN_TAG(req_id > 0, ERR_OPERATION_FAILED, _E, "Subscribe event failed");
        _D(YELLOW("Subscribe event(rule%d). req%d"), rule_id, req_id);
 
        return ERR_NONE;
 }
 
-int ctx::context_monitor::_subscribe(const char* subject, json* option, context_listener_iface* listener, bool wait_response)
+int ctx::context_monitor::_subscribe(const char* subject, json* option, context_listener_iface* listener)
 {
        IF_FAIL_RETURN(subject, ERR_INVALID_PARAMETER);
 
-       int rid = find_sub(subject, option);
+       int rid = find_sub(REQ_SUBSCRIBE, subject, option);
        if (rid > 0) {
-               add_listener(rid, listener);
+               add_listener(REQ_SUBSCRIBE, rid, listener);
                _D("Duplicated request for %s", subject);
                return rid;
        }
@@ -89,16 +88,14 @@ int ctx::context_monitor::_subscribe(const char* subject, json* option, context_
        rid = generate_req_id();
 
        fact_request *req = new(std::nothrow) fact_request(REQ_SUBSCRIBE,
-                       rid, subject, option ? option->str().c_str() : NULL, wait_response ? this : NULL);
+                       rid, subject, option ? option->str().c_str() : NULL, this);
        IF_FAIL_RETURN_TAG(req, -1, _E, "Memory allocation failed");
 
-       send_request(req);      //TODO: what happens if the request actually takes more time than the timeout
-       add_sub(rid, subject, option, listener);
-
-       IF_FAIL_RETURN_TAG(wait_response, rid, _D, "Ignoring response for %s", subject);
+       _context_mgr->assign_request(req);
+       add_sub(REQ_SUBSCRIBE, rid, subject, option, listener);
 
        if (last_err != ERR_NONE) {
-               remove_sub(rid);
+               remove_sub(REQ_SUBSCRIBE, rid);
                _E("Subscription request failed: %#x", last_err);
                return -1;
        }
@@ -113,13 +110,13 @@ int ctx::context_monitor::unsubscribe(int rule_id, std::string subject, ctx::jso
                return ERR_NONE;
        }
 
-       int rid = find_sub(subject.c_str(), &option);
+       int rid = find_sub(REQ_SUBSCRIBE, subject.c_str(), &option);
        if (rid < 0) {
                _D("Invalid unsubscribe request");
                return ERR_INVALID_PARAMETER;
        }
 
-       if (remove_listener(rid, listener) <= 0) {
+       if (remove_listener(REQ_SUBSCRIBE, rid, listener) <= 0) {
                _unsubscribe(subject.c_str(), rid);
        }
        _D(YELLOW("Unsubscribe event(rule%d). req%d"), rule_id, rid);
@@ -132,58 +129,52 @@ void ctx::context_monitor::_unsubscribe(const char *subject, int subscription_id
        fact_request *req = new(std::nothrow) fact_request(REQ_UNSUBSCRIBE, subscription_id, subject, NULL, NULL);
        IF_FAIL_VOID_TAG(req, _E, "Memory allocation failed");
 
-       send_request(req);
-       remove_sub(subscription_id);
-}
-
-bool ctx::context_monitor::send_request(fact_request* req)
-{
        _context_mgr->assign_request(req);
-       return false;
+       remove_sub(REQ_SUBSCRIBE, subscription_id);
 }
 
-int ctx::context_monitor::read(std::string subject, json option, ctx::json* result)
+int ctx::context_monitor::read(std::string subject, json option, context_listener_iface* listener)
 {
        if (subject.compare(TIMER_CONDITION_SUBJECT) == 0) {
 //             return timer->read(result);     TODO
                return ERR_NONE;
        }
 
-/*     context_fact fact;
-       bool ret = _read(subject.c_str(), &option, fact);
-       IF_FAIL_RETURN_TAG(ret, ERR_OPERATION_FAILED, _E, "Read fact failed");
+       int req_id = _read(subject.c_str(), &option, listener);
+       IF_FAIL_RETURN_TAG(req_id > 0, ERR_OPERATION_FAILED, _E, "Read condition failed");
+       _D(YELLOW("Read condition(%s). req%d"), subject.c_str(), req_id);
 
-       *result = fact.get_data();
-*/
        return ERR_NONE;
 }
 
-/*bool ctx::context_monitor::_read(const char* subject, json* option, context_fact& fact)
+int ctx::context_monitor::_read(const char* subject, json* option, context_listener_iface* listener)
 {
-       // TODO implement read async
-       IF_FAIL_RETURN(subject, false);
+       IF_FAIL_RETURN(subject, ERR_INVALID_PARAMETER);
 
-       int rid = generate_req_id();
+       int rid = find_sub(REQ_READ, subject, option);
+       if (rid > 0) {
+               add_listener(REQ_READ, rid, listener);
+               _D("Duplicated request for %s", subject);
+               return rid;
+       }
+
+       rid = generate_req_id();
 
-       fact_request *req = new(std::nothrow) fact_request(REQ_READ_SYNC,
+       fact_request *req = new(std::nothrow) fact_request(REQ_READ,
                        rid, subject, option ? option->str().c_str() : NULL, this);
-       IF_FAIL_RETURN_TAG(req, false, _E, "Memory allocation failed");
+       IF_FAIL_RETURN_TAG(req, -1, _E, "Memory allocation failed");
 
-       send_request(req);
+       _context_mgr->assign_request(req);
+       add_sub(REQ_READ, rid, subject, option, listener);
 
        if (last_err != ERR_NONE) {
                _E("Read request failed: %#x", last_err);
-               return false;
+               return -1;
        }
 
-       fact.set_req_id(rid);
-       fact.set_subject(subject);
-       fact.set_data(last_data_read);
-       last_data_read = EMPTY_JSON_OBJECT;
-
-       return true;
+       return rid;
 }
-*/
+
 bool ctx::context_monitor::is_supported(std::string subject)
 {
        if (subject.compare(TIMER_EVENT_SUBJECT) == 0
@@ -214,15 +205,17 @@ bool ctx::context_monitor::get_fact_definition(std::string &subject, int &operat
        return _context_mgr->pop_trigger_item(subject, operation, attributes, options);
 }
 
-int ctx::context_monitor::find_sub(const char* subject, json* option)
+int ctx::context_monitor::find_sub(request_type type, const char* subject, json* option)
 {
        // @return      request id
+       subscr_map_t* map = (type == REQ_SUBSCRIBE)? &subscr_map : &read_map;
+
        json opt_j;
        if (option) {
                opt_j = *option;
        }
 
-       for (subscr_map_t::iterator it = subscr_map.begin(); it != subscr_map.end(); ++it) {
+       for (subscr_map_t::iterator it = map->begin(); it != map->end(); ++it) {
                if ((*(it->second)).subject == subject && (*(it->second)).option == opt_j) {
                        return it->first;
                }
@@ -231,47 +224,56 @@ int ctx::context_monitor::find_sub(const char* subject, json* option)
        return -1;
 }
 
-bool ctx::context_monitor::add_sub(int sid, const char* subject, json* option, context_listener_iface* listener)
+bool ctx::context_monitor::add_sub(request_type type, int sid, const char* subject, json* option, context_listener_iface* listener)
 {
+       subscr_map_t* map = (type == REQ_SUBSCRIBE)? &subscr_map : &read_map;
+
        subscr_info_s *info = new(std::nothrow) subscr_info_s(sid, subject, option);
        IF_FAIL_RETURN_TAG(info, false, _E, "Memory allocation failed");
        info->listener_list.push_back(listener);
 
-       subscr_map[sid] = info;
+       map->insert(std::pair<int, subscr_info_s*>(sid, info));
+
        return true;
 }
 
-void ctx::context_monitor::remove_sub(const char* subject, json* option)
+void ctx::context_monitor::remove_sub(request_type type, const char* subject, json* option)
 {
+       subscr_map_t* map = (type == REQ_SUBSCRIBE)? &subscr_map : &read_map;
+
        json opt_j;
        if (option) {
                opt_j = *option;
        }
 
-       for (subscr_map_t::iterator it = subscr_map.begin(); it != subscr_map.end(); ++it) {
+       for (subscr_map_t::iterator it = map->begin(); it != map->end(); ++it) {
                if ((*(it->second)).subject == subject && (*(it->second)).option == opt_j) {
                        delete it->second;
-                       subscr_map.erase(it);
+                       map->erase(it);
                        return;
                }
        }
 }
 
-void ctx::context_monitor::remove_sub(int sid)
+void ctx::context_monitor::remove_sub(request_type type, int sid)
 {
-       subscr_info_s* info = subscr_map[sid];
+       subscr_map_t* map = (type == REQ_SUBSCRIBE)? &subscr_map : &read_map;
+
+       subscr_info_s* info = map->at(sid);
        info->listener_list.clear();
 
        delete info;
-       subscr_map.erase(sid);
+       map->erase(sid);
 
        return;
 }
 
-int ctx::context_monitor::add_listener(int sid, context_listener_iface* listener)
+int ctx::context_monitor::add_listener(request_type type, int sid, context_listener_iface* listener)
 {
        // @return      number of listeners for the corresponding sid
-       subscr_map_t::iterator it = subscr_map.find(sid);
+       subscr_map_t* map = (type == REQ_SUBSCRIBE)? &subscr_map : &read_map;
+
+       subscr_map_t::iterator it = map->find(sid);
 
        subscr_info_s* info = it->second;
        info->listener_list.push_back(listener);
@@ -279,10 +281,12 @@ int ctx::context_monitor::add_listener(int sid, context_listener_iface* listener
        return info->listener_list.size();
 }
 
-int ctx::context_monitor::remove_listener(int sid, context_listener_iface* listener)
+int ctx::context_monitor::remove_listener(request_type type, int sid, context_listener_iface* listener)
 {
        // @return      number of listeners for the corresponding sid
-       subscr_map_t::iterator it = subscr_map.find(sid);
+       subscr_map_t* map = (type == REQ_SUBSCRIBE)? &subscr_map : &read_map;
+
+       subscr_map_t::iterator it = map->find(sid);
 
        subscr_info_s* info = it->second;
 
@@ -296,17 +300,32 @@ int ctx::context_monitor::remove_listener(int sid, context_listener_iface* liste
        return info->listener_list.size();
 }
 
-void ctx::context_monitor::reply_result(int req_id, int error, json* request_result, json* fact)
+void ctx::context_monitor::reply_result(int req_id, int error, json* request_result)
 {
        _D("Request result received: %d", req_id);
+
        last_rid = req_id;
        last_err = error;
-       last_data_read = (fact ? *fact : EMPTY_JSON_OBJECT);
+}
+
+void ctx::context_monitor::reply_result(int req_id, int error, const char* subject, json* option, json* fact)
+{
+       _D(YELLOW("Condition received: subject(%s), option(%s), fact(%s)"), subject, option->str().c_str(), fact->str().c_str());
+
+       subscr_map_t::iterator it = read_map.find(req_id);
+       IF_FAIL_VOID_TAG(it != read_map.end(), _E, "Request id not found");
+
+       subscr_info_s* info = it->second;
+       for (listener_list_t::iterator it2 = info->listener_list.begin(); it2 != info->listener_list.end(); ++it2) {
+               (*it2)->on_condition_received(subject, *option, *fact);
+       }
+
+       remove_sub(REQ_READ, req_id);
 }
 
 void ctx::context_monitor::publish_fact(int req_id, int error, const char* subject, json* option, json* fact)
 {
-       _D(YELLOW("Fact received: subject(%s), option(%s), fact(%s)"), subject, option->str().c_str(), fact->str().c_str());
+       _D(YELLOW("Event received: subject(%s), option(%s), fact(%s)"), subject, option->str().c_str(), fact->str().c_str());
 
        subscr_map_t::iterator it = subscr_map.find(req_id);
        IF_FAIL_VOID_TAG(it != subscr_map.end(), _E, "Request id not found");
@@ -315,5 +334,4 @@ void ctx::context_monitor::publish_fact(int req_id, int error, const char* subje
        for (listener_list_t::iterator it2 = info->listener_list.begin(); it2 != info->listener_list.end(); ++it2) {
                (*it2)->on_event_received(subject, *option, *fact);
        }
-
 }
index f53157eb04a66c88860550b4e92d5ecaeb13e1d9..c8f2a4bb8d86f5937e428bfe55f0b0490a0f1b11 100644 (file)
@@ -36,19 +36,20 @@ namespace ctx {
 
                int subscribe(int rule_id, std::string subject, ctx::json option, context_listener_iface* listener);
                int unsubscribe(int rule_id, std::string subject, ctx::json option, context_listener_iface* listener);
-               int read(std::string subject, json option, ctx::json* result);
+               int read(std::string subject, json option, context_listener_iface* listener);
                bool is_supported(std::string subject);
                bool is_allowed(const char *client, const char *subject);
 
-               void reply_result(int req_id, int error, json *request_result = NULL, ctx::json *fact = NULL);
-               void publish_fact(int req_id, int error, const char *subject, json *option, ctx::json *fact);
+               void reply_result(int req_id, int error, json *request_result = NULL);
+               void reply_result(int req_id, int error, const char *subject, ctx::json *option, ctx::json *fact);
+               void publish_fact(int req_id, int error, const char *subject, ctx::json *option, ctx::json *fact);
 
                bool get_fact_definition(std::string &subject, int &operation, ctx::json &attributes, ctx::json &options);
 
        private:
-               int _subscribe(const char* subject, ctx::json* option, context_listener_iface* listener, bool wait_response);
+               int _subscribe(const char* subject, ctx::json* option, context_listener_iface* listener);
                void _unsubscribe(const char *subject, int subscription_id);
-//             bool _read(const char *subject, ctx::json *option, context_fact& fact);
+               int _read(const char *subject, ctx::json *option, context_listener_iface* listener);
 
                ctx::trigger_timer* timer;
                static context_manager_impl *_context_mgr;
@@ -71,15 +72,15 @@ namespace ctx {
 
                typedef std::map<int, subscr_info_s*> subscr_map_t;
                subscr_map_t subscr_map;
+               subscr_map_t read_map;
 
-               int find_sub(const char *subject, ctx::json *option);
-               bool add_sub(int sid, const char *subject, ctx::json *option, context_listener_iface* listener);
-               void remove_sub(const char *subject, ctx::json *option);
-               void remove_sub(int sid);
-               int add_listener(int sid, context_listener_iface* listener);
-               int remove_listener(int sid, context_listener_iface* listener);
+               int find_sub(request_type type, const char *subject, ctx::json *option);
+               bool add_sub(request_type type, int sid, const char *subject, ctx::json *option, context_listener_iface* listener);
+               void remove_sub(request_type type, const char *subject, ctx::json *option);
+               void remove_sub(request_type type, int sid);
+               int add_listener(request_type type, int sid, context_listener_iface* listener);
+               int remove_listener(request_type type, int sid, context_listener_iface* listener);
 
-               static bool send_request(fact_request* req);
        };      /* class context_monitor */
 
 }      /* namespace ctx */
index 78955cced7edbd32a867c2cdff7cb3407267275a..751d15fa66f17cbb8c8fb9b834b8d88ff3b63ef1 100644 (file)
@@ -38,21 +38,22 @@ bool ctx::fact_request::reply(int error)
 {
        IF_FAIL_RETURN(!replied && _ctx_monitor, true);
        _ctx_monitor->reply_result(_req_id, error);
-       return (replied = true);
+       replied = (error != ERR_NONE);
+       return true;
 }
 
 bool ctx::fact_request::reply(int error, ctx::json& request_result)
 {
        IF_FAIL_RETURN(!replied && _ctx_monitor, true);
-       IF_FAIL_RETURN(_type != REQ_READ_SYNC, true);
        _ctx_monitor->reply_result(_req_id, error, &request_result);
-       return (replied = true);
+       replied = (error != ERR_NONE);
+       return true;
 }
 
 bool ctx::fact_request::reply(int error, ctx::json& request_result, ctx::json& data_read)
 {
        IF_FAIL_RETURN(!replied && _ctx_monitor, true);
-       _ctx_monitor->reply_result(_req_id, error, &request_result, &data_read);
+       _ctx_monitor->reply_result(_req_id, error, _subject.c_str(), &get_description(), &data_read);
        return (replied = true);
 }
 
index 7d15056b5d3eb28491deba2df01928279c5749e7..09d314ecd78beed2ab049702208bf408436d81ce 100644 (file)
@@ -31,7 +31,8 @@ ctx::trigger_rule::trigger_rule()
 }
 
 ctx::trigger_rule::trigger_rule(int i, ctx::json& d, const char* cr, context_monitor* cm)
-       : ctx_monitor(cm)
+       : result(EMPTY_JSON_OBJECT)
+       , ctx_monitor(cm)
        , id(i)
        , creator(cr)
 {
@@ -84,9 +85,44 @@ int ctx::trigger_rule::stop(void)
        return error;
 }
 
+bool ctx::trigger_rule::set_condition_option_based_on_event(ctx::json& option)
+{
+       // Set condition option if it references event data
+       std::list<std::string> option_keys;
+       option.get_keys(&option_keys);
+
+       for (std::list<std::string>::iterator it = option_keys.begin(); it != option_keys.end(); ++it) {
+               std::string opt_key = (*it);
+
+               std::string opt_val;
+               if (option.get(NULL, opt_key.c_str(), &opt_val)) {
+                       if (opt_val.find("?") != 0) {
+                               continue;
+                       }
+
+                       std::string event_key = opt_val.substr(1, opt_val.length() - 1);
+
+                       std::string new_str;
+                       int new_val;
+                       if (result.get(CONTEXT_RULE_EVENT "." CONTEXT_RULE_DATA, event_key.c_str(), &new_str)) {
+                               option.set(NULL, opt_key.c_str(), new_str);
+                       } else if (result.get(CONTEXT_RULE_EVENT "." CONTEXT_RULE_DATA, event_key.c_str(), &new_val)) {
+                               option.set(NULL, opt_key.c_str(), new_val);
+                       } else {
+                               _W("Failed to find '%s' in event data", event_key.c_str());
+                               return false;
+                       }
+               }
+       }
+
+       return true;
+}
+
 void ctx::trigger_rule::on_event_received(std::string name, ctx::json option, ctx::json data)
 {
-       clear_result();
+       if (result != EMPTY_JSON_OBJECT) {
+               clear_result();
+       }
        _D("Rule%d received event data", id);
 
        // Set event data
@@ -95,12 +131,22 @@ void ctx::trigger_rule::on_event_received(std::string name, ctx::json option, ct
        result.set(CONTEXT_RULE_EVENT, CONTEXT_RULE_DATA, data);
 
        if (condition.size() == 0) {
-               on_context_data_prepared(data);
+               on_context_data_prepared();
                return;
        }
 
+       // TODO check if event matched first
+
+       // Request read conditions
        for (std::list<context_item_t>::iterator it = condition.begin(); it != condition.end(); ++it) {
-               // TODO send read request for each condition
+               ctx::json cond_option = (*it)->option.str();
+               if (!set_condition_option_based_on_event(cond_option)) { // cond_option should be copy of original option.
+                       clear_result();
+                       return;
+               }
+
+               int error = ctx_monitor->read((*it)->name.c_str(), cond_option, this);
+               IF_FAIL_VOID_TAG(error == ERR_NONE, _E, "Failed to read condition");
        }
 
        // TODO timer set
@@ -108,6 +154,8 @@ void ctx::trigger_rule::on_event_received(std::string name, ctx::json option, ct
 
 void ctx::trigger_rule::on_condition_received(std::string name, ctx::json option, ctx::json data)
 {
+       _D("Rule%d received condition data", id);
+
        // Set condition data
        ctx::json item;
        item.set(NULL, CONTEXT_RULE_NAME, name);
@@ -116,19 +164,19 @@ void ctx::trigger_rule::on_condition_received(std::string name, ctx::json option
        result.array_append(NULL, CONTEXT_RULE_CONDITION, item);
 
        if (result.array_get_size(NULL, CONTEXT_RULE_CONDITION) == (int) condition.size()) {
-               on_context_data_prepared(data);
+               on_context_data_prepared();
        }
 }
 
 void ctx::trigger_rule::clear_result()
 {
-       result = json();
+       result = EMPTY_JSON_OBJECT;
        // TODO timer cancel
 }
 
-void ctx::trigger_rule::on_context_data_prepared(ctx::json& data)
+void ctx::trigger_rule::on_context_data_prepared(void)
 {
-       if (ctx::rule_evaluator::evaluate_rule(statement, data)) {
+       if (ctx::rule_evaluator::evaluate_rule(statement, result)) {
                ctx::action_manager::trigger_action(action, creator);
        }
 
index 43b3109622775248989763ee6935560cd14db3f5..4e3490e3c29043a7aa06db71a3fd100e5fca8522 100644 (file)
@@ -51,7 +51,8 @@ namespace ctx {
                        context_monitor* ctx_monitor;
 
                        void clear_result(void);
-                       void on_context_data_prepared(ctx::json& data);
+                       bool set_condition_option_based_on_event(ctx::json& option);
+                       void on_context_data_prepared(void);
 
                public:
                        int id;