static int last_rid;
static int last_err;
-static ctx::json last_data_read;
ctx::context_manager_impl *ctx::context_monitor::_context_mgr = NULL;
return ERR_NONE;
}
- int req_id = _subscribe(subject.c_str(), &option, listener, true);
+ int req_id = _subscribe(subject.c_str(), &option, listener);
IF_FAIL_RETURN_TAG(req_id > 0, ERR_OPERATION_FAILED, _E, "Subscribe event failed");
_D(YELLOW("Subscribe event(rule%d). req%d"), rule_id, req_id);
return ERR_NONE;
}
-int ctx::context_monitor::_subscribe(const char* subject, json* option, context_listener_iface* listener, bool wait_response)
+int ctx::context_monitor::_subscribe(const char* subject, json* option, context_listener_iface* listener)
{
IF_FAIL_RETURN(subject, ERR_INVALID_PARAMETER);
- int rid = find_sub(subject, option);
+ int rid = find_sub(REQ_SUBSCRIBE, subject, option);
if (rid > 0) {
- add_listener(rid, listener);
+ add_listener(REQ_SUBSCRIBE, rid, listener);
_D("Duplicated request for %s", subject);
return rid;
}
rid = generate_req_id();
fact_request *req = new(std::nothrow) fact_request(REQ_SUBSCRIBE,
- rid, subject, option ? option->str().c_str() : NULL, wait_response ? this : NULL);
+ rid, subject, option ? option->str().c_str() : NULL, this);
IF_FAIL_RETURN_TAG(req, -1, _E, "Memory allocation failed");
- send_request(req); //TODO: what happens if the request actually takes more time than the timeout
- add_sub(rid, subject, option, listener);
-
- IF_FAIL_RETURN_TAG(wait_response, rid, _D, "Ignoring response for %s", subject);
+ _context_mgr->assign_request(req);
+ add_sub(REQ_SUBSCRIBE, rid, subject, option, listener);
if (last_err != ERR_NONE) {
- remove_sub(rid);
+ remove_sub(REQ_SUBSCRIBE, rid);
_E("Subscription request failed: %#x", last_err);
return -1;
}
return ERR_NONE;
}
- int rid = find_sub(subject.c_str(), &option);
+ int rid = find_sub(REQ_SUBSCRIBE, subject.c_str(), &option);
if (rid < 0) {
_D("Invalid unsubscribe request");
return ERR_INVALID_PARAMETER;
}
- if (remove_listener(rid, listener) <= 0) {
+ if (remove_listener(REQ_SUBSCRIBE, rid, listener) <= 0) {
_unsubscribe(subject.c_str(), rid);
}
_D(YELLOW("Unsubscribe event(rule%d). req%d"), rule_id, rid);
fact_request *req = new(std::nothrow) fact_request(REQ_UNSUBSCRIBE, subscription_id, subject, NULL, NULL);
IF_FAIL_VOID_TAG(req, _E, "Memory allocation failed");
- send_request(req);
- remove_sub(subscription_id);
-}
-
-bool ctx::context_monitor::send_request(fact_request* req)
-{
_context_mgr->assign_request(req);
- return false;
+ remove_sub(REQ_SUBSCRIBE, subscription_id);
}
-int ctx::context_monitor::read(std::string subject, json option, ctx::json* result)
+int ctx::context_monitor::read(std::string subject, json option, context_listener_iface* listener)
{
if (subject.compare(TIMER_CONDITION_SUBJECT) == 0) {
// return timer->read(result); TODO
return ERR_NONE;
}
-/* context_fact fact;
- bool ret = _read(subject.c_str(), &option, fact);
- IF_FAIL_RETURN_TAG(ret, ERR_OPERATION_FAILED, _E, "Read fact failed");
+ int req_id = _read(subject.c_str(), &option, listener);
+ IF_FAIL_RETURN_TAG(req_id > 0, ERR_OPERATION_FAILED, _E, "Read condition failed");
+ _D(YELLOW("Read condition(%s). req%d"), subject.c_str(), req_id);
- *result = fact.get_data();
-*/
return ERR_NONE;
}
-/*bool ctx::context_monitor::_read(const char* subject, json* option, context_fact& fact)
+int ctx::context_monitor::_read(const char* subject, json* option, context_listener_iface* listener)
{
- // TODO implement read async
- IF_FAIL_RETURN(subject, false);
+ IF_FAIL_RETURN(subject, ERR_INVALID_PARAMETER);
- int rid = generate_req_id();
+ int rid = find_sub(REQ_READ, subject, option);
+ if (rid > 0) {
+ add_listener(REQ_READ, rid, listener);
+ _D("Duplicated request for %s", subject);
+ return rid;
+ }
+
+ rid = generate_req_id();
- fact_request *req = new(std::nothrow) fact_request(REQ_READ_SYNC,
+ fact_request *req = new(std::nothrow) fact_request(REQ_READ,
rid, subject, option ? option->str().c_str() : NULL, this);
- IF_FAIL_RETURN_TAG(req, false, _E, "Memory allocation failed");
+ IF_FAIL_RETURN_TAG(req, -1, _E, "Memory allocation failed");
- send_request(req);
+ _context_mgr->assign_request(req);
+ add_sub(REQ_READ, rid, subject, option, listener);
if (last_err != ERR_NONE) {
_E("Read request failed: %#x", last_err);
- return false;
+ return -1;
}
- fact.set_req_id(rid);
- fact.set_subject(subject);
- fact.set_data(last_data_read);
- last_data_read = EMPTY_JSON_OBJECT;
-
- return true;
+ return rid;
}
-*/
+
bool ctx::context_monitor::is_supported(std::string subject)
{
if (subject.compare(TIMER_EVENT_SUBJECT) == 0
return _context_mgr->pop_trigger_item(subject, operation, attributes, options);
}
-int ctx::context_monitor::find_sub(const char* subject, json* option)
+int ctx::context_monitor::find_sub(request_type type, const char* subject, json* option)
{
// @return request id
+ subscr_map_t* map = (type == REQ_SUBSCRIBE)? &subscr_map : &read_map;
+
json opt_j;
if (option) {
opt_j = *option;
}
- for (subscr_map_t::iterator it = subscr_map.begin(); it != subscr_map.end(); ++it) {
+ for (subscr_map_t::iterator it = map->begin(); it != map->end(); ++it) {
if ((*(it->second)).subject == subject && (*(it->second)).option == opt_j) {
return it->first;
}
return -1;
}
-bool ctx::context_monitor::add_sub(int sid, const char* subject, json* option, context_listener_iface* listener)
+bool ctx::context_monitor::add_sub(request_type type, int sid, const char* subject, json* option, context_listener_iface* listener)
{
+ subscr_map_t* map = (type == REQ_SUBSCRIBE)? &subscr_map : &read_map;
+
subscr_info_s *info = new(std::nothrow) subscr_info_s(sid, subject, option);
IF_FAIL_RETURN_TAG(info, false, _E, "Memory allocation failed");
info->listener_list.push_back(listener);
- subscr_map[sid] = info;
+ map->insert(std::pair<int, subscr_info_s*>(sid, info));
+
return true;
}
-void ctx::context_monitor::remove_sub(const char* subject, json* option)
+void ctx::context_monitor::remove_sub(request_type type, const char* subject, json* option)
{
+ subscr_map_t* map = (type == REQ_SUBSCRIBE)? &subscr_map : &read_map;
+
json opt_j;
if (option) {
opt_j = *option;
}
- for (subscr_map_t::iterator it = subscr_map.begin(); it != subscr_map.end(); ++it) {
+ for (subscr_map_t::iterator it = map->begin(); it != map->end(); ++it) {
if ((*(it->second)).subject == subject && (*(it->second)).option == opt_j) {
delete it->second;
- subscr_map.erase(it);
+ map->erase(it);
return;
}
}
}
-void ctx::context_monitor::remove_sub(int sid)
+void ctx::context_monitor::remove_sub(request_type type, int sid)
{
- subscr_info_s* info = subscr_map[sid];
+ subscr_map_t* map = (type == REQ_SUBSCRIBE)? &subscr_map : &read_map;
+
+ subscr_info_s* info = map->at(sid);
info->listener_list.clear();
delete info;
- subscr_map.erase(sid);
+ map->erase(sid);
return;
}
-int ctx::context_monitor::add_listener(int sid, context_listener_iface* listener)
+int ctx::context_monitor::add_listener(request_type type, int sid, context_listener_iface* listener)
{
// @return number of listeners for the corresponding sid
- subscr_map_t::iterator it = subscr_map.find(sid);
+ subscr_map_t* map = (type == REQ_SUBSCRIBE)? &subscr_map : &read_map;
+
+ subscr_map_t::iterator it = map->find(sid);
subscr_info_s* info = it->second;
info->listener_list.push_back(listener);
return info->listener_list.size();
}
-int ctx::context_monitor::remove_listener(int sid, context_listener_iface* listener)
+int ctx::context_monitor::remove_listener(request_type type, int sid, context_listener_iface* listener)
{
// @return number of listeners for the corresponding sid
- subscr_map_t::iterator it = subscr_map.find(sid);
+ subscr_map_t* map = (type == REQ_SUBSCRIBE)? &subscr_map : &read_map;
+
+ subscr_map_t::iterator it = map->find(sid);
subscr_info_s* info = it->second;
return info->listener_list.size();
}
-void ctx::context_monitor::reply_result(int req_id, int error, json* request_result, json* fact)
+void ctx::context_monitor::reply_result(int req_id, int error, json* request_result)
{
_D("Request result received: %d", req_id);
+
last_rid = req_id;
last_err = error;
- last_data_read = (fact ? *fact : EMPTY_JSON_OBJECT);
+}
+
+void ctx::context_monitor::reply_result(int req_id, int error, const char* subject, json* option, json* fact)
+{
+ _D(YELLOW("Condition received: subject(%s), option(%s), fact(%s)"), subject, option->str().c_str(), fact->str().c_str());
+
+ subscr_map_t::iterator it = read_map.find(req_id);
+ IF_FAIL_VOID_TAG(it != read_map.end(), _E, "Request id not found");
+
+ subscr_info_s* info = it->second;
+ for (listener_list_t::iterator it2 = info->listener_list.begin(); it2 != info->listener_list.end(); ++it2) {
+ (*it2)->on_condition_received(subject, *option, *fact);
+ }
+
+ remove_sub(REQ_READ, req_id);
}
void ctx::context_monitor::publish_fact(int req_id, int error, const char* subject, json* option, json* fact)
{
- _D(YELLOW("Fact received: subject(%s), option(%s), fact(%s)"), subject, option->str().c_str(), fact->str().c_str());
+ _D(YELLOW("Event received: subject(%s), option(%s), fact(%s)"), subject, option->str().c_str(), fact->str().c_str());
subscr_map_t::iterator it = subscr_map.find(req_id);
IF_FAIL_VOID_TAG(it != subscr_map.end(), _E, "Request id not found");
for (listener_list_t::iterator it2 = info->listener_list.begin(); it2 != info->listener_list.end(); ++it2) {
(*it2)->on_event_received(subject, *option, *fact);
}
-
}
int subscribe(int rule_id, std::string subject, ctx::json option, context_listener_iface* listener);
int unsubscribe(int rule_id, std::string subject, ctx::json option, context_listener_iface* listener);
- int read(std::string subject, json option, ctx::json* result);
+ int read(std::string subject, json option, context_listener_iface* listener);
bool is_supported(std::string subject);
bool is_allowed(const char *client, const char *subject);
- void reply_result(int req_id, int error, json *request_result = NULL, ctx::json *fact = NULL);
- void publish_fact(int req_id, int error, const char *subject, json *option, ctx::json *fact);
+ void reply_result(int req_id, int error, json *request_result = NULL);
+ void reply_result(int req_id, int error, const char *subject, ctx::json *option, ctx::json *fact);
+ void publish_fact(int req_id, int error, const char *subject, ctx::json *option, ctx::json *fact);
bool get_fact_definition(std::string &subject, int &operation, ctx::json &attributes, ctx::json &options);
private:
- int _subscribe(const char* subject, ctx::json* option, context_listener_iface* listener, bool wait_response);
+ int _subscribe(const char* subject, ctx::json* option, context_listener_iface* listener);
void _unsubscribe(const char *subject, int subscription_id);
-// bool _read(const char *subject, ctx::json *option, context_fact& fact);
+ int _read(const char *subject, ctx::json *option, context_listener_iface* listener);
ctx::trigger_timer* timer;
static context_manager_impl *_context_mgr;
typedef std::map<int, subscr_info_s*> subscr_map_t;
subscr_map_t subscr_map;
+ subscr_map_t read_map;
- int find_sub(const char *subject, ctx::json *option);
- bool add_sub(int sid, const char *subject, ctx::json *option, context_listener_iface* listener);
- void remove_sub(const char *subject, ctx::json *option);
- void remove_sub(int sid);
- int add_listener(int sid, context_listener_iface* listener);
- int remove_listener(int sid, context_listener_iface* listener);
+ int find_sub(request_type type, const char *subject, ctx::json *option);
+ bool add_sub(request_type type, int sid, const char *subject, ctx::json *option, context_listener_iface* listener);
+ void remove_sub(request_type type, const char *subject, ctx::json *option);
+ void remove_sub(request_type type, int sid);
+ int add_listener(request_type type, int sid, context_listener_iface* listener);
+ int remove_listener(request_type type, int sid, context_listener_iface* listener);
- static bool send_request(fact_request* req);
}; /* class context_monitor */
} /* namespace ctx */
}
ctx::trigger_rule::trigger_rule(int i, ctx::json& d, const char* cr, context_monitor* cm)
- : ctx_monitor(cm)
+ : result(EMPTY_JSON_OBJECT)
+ , ctx_monitor(cm)
, id(i)
, creator(cr)
{
return error;
}
+bool ctx::trigger_rule::set_condition_option_based_on_event(ctx::json& option)
+{
+ // Set condition option if it references event data
+ std::list<std::string> option_keys;
+ option.get_keys(&option_keys);
+
+ for (std::list<std::string>::iterator it = option_keys.begin(); it != option_keys.end(); ++it) {
+ std::string opt_key = (*it);
+
+ std::string opt_val;
+ if (option.get(NULL, opt_key.c_str(), &opt_val)) {
+ if (opt_val.find("?") != 0) {
+ continue;
+ }
+
+ std::string event_key = opt_val.substr(1, opt_val.length() - 1);
+
+ std::string new_str;
+ int new_val;
+ if (result.get(CONTEXT_RULE_EVENT "." CONTEXT_RULE_DATA, event_key.c_str(), &new_str)) {
+ option.set(NULL, opt_key.c_str(), new_str);
+ } else if (result.get(CONTEXT_RULE_EVENT "." CONTEXT_RULE_DATA, event_key.c_str(), &new_val)) {
+ option.set(NULL, opt_key.c_str(), new_val);
+ } else {
+ _W("Failed to find '%s' in event data", event_key.c_str());
+ return false;
+ }
+ }
+ }
+
+ return true;
+}
+
void ctx::trigger_rule::on_event_received(std::string name, ctx::json option, ctx::json data)
{
- clear_result();
+ if (result != EMPTY_JSON_OBJECT) {
+ clear_result();
+ }
_D("Rule%d received event data", id);
// Set event data
result.set(CONTEXT_RULE_EVENT, CONTEXT_RULE_DATA, data);
if (condition.size() == 0) {
- on_context_data_prepared(data);
+ on_context_data_prepared();
return;
}
+ // TODO check if event matched first
+
+ // Request read conditions
for (std::list<context_item_t>::iterator it = condition.begin(); it != condition.end(); ++it) {
- // TODO send read request for each condition
+ ctx::json cond_option = (*it)->option.str();
+ if (!set_condition_option_based_on_event(cond_option)) { // cond_option should be copy of original option.
+ clear_result();
+ return;
+ }
+
+ int error = ctx_monitor->read((*it)->name.c_str(), cond_option, this);
+ IF_FAIL_VOID_TAG(error == ERR_NONE, _E, "Failed to read condition");
}
// TODO timer set
void ctx::trigger_rule::on_condition_received(std::string name, ctx::json option, ctx::json data)
{
+ _D("Rule%d received condition data", id);
+
// Set condition data
ctx::json item;
item.set(NULL, CONTEXT_RULE_NAME, name);
result.array_append(NULL, CONTEXT_RULE_CONDITION, item);
if (result.array_get_size(NULL, CONTEXT_RULE_CONDITION) == (int) condition.size()) {
- on_context_data_prepared(data);
+ on_context_data_prepared();
}
}
void ctx::trigger_rule::clear_result()
{
- result = json();
+ result = EMPTY_JSON_OBJECT;
// TODO timer cancel
}
-void ctx::trigger_rule::on_context_data_prepared(ctx::json& data)
+void ctx::trigger_rule::on_context_data_prepared(void)
{
- if (ctx::rule_evaluator::evaluate_rule(statement, data)) {
+ if (ctx::rule_evaluator::evaluate_rule(statement, result)) {
ctx::action_manager::trigger_action(action, creator);
}