Addition of a 10 second timeout on websocket get and getRanged requests.
authorMichael Carpenter <malcom2073@gmail.com>
Sat, 29 Dec 2012 19:36:29 +0000 (14:36 -0500)
committerMichael Carpenter <malcom2073@gmail.com>
Sat, 29 Dec 2012 19:36:29 +0000 (14:36 -0500)
plugins/websocketsourceplugin/websocketsource.cpp
plugins/websocketsourceplugin/websocketsource.h

index 6b75e2a..7596012 100644 (file)
@@ -124,6 +124,39 @@ bool gioPollingFunc(GIOChannel *source,GIOCondition condition,gpointer data)
        return true;
 }
 
+static int checkTimeouts(gpointer data)
+{
+       WebSocketSource *src = (WebSocketSource*)data;
+       for (auto i=src->uuidTimeoutMap.begin();i!= src->uuidTimeoutMap.end();i++)
+       {
+               if (src->uuidRangedReplyMap.find((*i).first) != src->uuidRangedReplyMap.end())
+               {
+                       //A source exists!
+                       if (amb::currentTime() > (*i).second)
+                       {
+                               //We've reached timeout
+                               DebugOut() << "Timeout reached for request ID:" << (*i).first << "\n";
+                               src->uuidRangedReplyMap[(*i).first]->success = false;
+                               src->uuidRangedReplyMap[(*i).first]->completed(src->uuidRangedReplyMap[(*i).first]);
+                               src->uuidRangedReplyMap.erase((*i).first);
+                               src->uuidTimeoutMap.erase((*i).first);
+                               i--;
+                       }
+                       else
+                       {
+                               //No timeout yet, keep waiting.
+                       }
+               }
+               else
+               {
+                       //Reply has already come back, ignore and erase from list.
+                       src->uuidTimeoutMap.erase((*i).first);
+                       i--;
+               }
+
+       }
+}
+
 static int callback_http_only(libwebsocket_context *context,struct libwebsocket *wsi,enum libwebsocket_callback_reasons reason,void *user, void *in, size_t len)
 {
        unsigned char buf[LWS_SEND_BUFFER_PRE_PADDING + 4096 + LWS_SEND_BUFFER_POST_PADDING];
@@ -320,10 +353,17 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
                                                        propertylist.push_back(type);
                                                        
                                                }
-                                               source->uuidRangedReplyMap[id]->values = propertylist;
-                                               source->uuidRangedReplyMap[id]->success = true;
-                                               source->uuidRangedReplyMap[id]->completed(source->uuidRangedReplyMap[id]);
-                                               source->uuidRangedReplyMap.erase(id);
+                                               if (source->uuidRangedReplyMap.find(id) != source->uuidRangedReplyMap.end())
+                                               {
+                                                       source->uuidRangedReplyMap[id]->values = propertylist;
+                                                       source->uuidRangedReplyMap[id]->success = true;
+                                                       source->uuidRangedReplyMap[id]->completed(source->uuidRangedReplyMap[id]);
+                                                       source->uuidRangedReplyMap.erase(id);
+                                               }
+                                               else
+                                               {
+                                                       DebugOut() << "getRanged methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
+                                               }
                                                while (propertylist.size() > 0)
                                                {
                                                        
@@ -362,11 +402,18 @@ static int callback_http_only(libwebsocket_context *context,struct libwebsocket
                                                        AbstractPropertyType* v = VehicleProperty::getPropertyTypeForPropertyNameValue(property,value);
                                                        v->timestamp = boost::lexical_cast<double,std::string>(timestamp);
                                                        v->sequence = boost::lexical_cast<double,std::string>(sequence);
-                                                       source->uuidReplyMap[id]->value = v;
-                                                       source->uuidReplyMap[id]->success = true;
-                                                       source->uuidReplyMap[id]->completed(source->uuidReplyMap[id]);
-                                                       source->uuidReplyMap.erase(id);
-                                                       delete v;
+                                                       if (source->uuidReplyMap.find(id) != source->uuidReplyMap.end())
+                                                       {
+                                                               source->uuidReplyMap[id]->value = v;
+                                                               source->uuidReplyMap[id]->success = true;
+                                                               source->uuidReplyMap[id]->completed(source->uuidReplyMap[id]);
+                                                               source->uuidReplyMap.erase(id);
+                                                               delete v;
+                                                       }
+                                                       else
+                                                       {
+                                                               DebugOut() << "get methodReply has been recieved, without a request being in!. This is likely due to a request coming in after the timeout has elapsed.\n";
+                                                       }
                                                }
                                        }
                                        else
@@ -421,6 +468,7 @@ WebSocketSource::WebSocketSource(AbstractRoutingEngine *re, map<string, string>
        re->setSupported(supported(), this);
 
        //printf("websocketsource loaded!!!\n");
+       g_timeout_add(1000,checkTimeouts,this); //Do this once per second, check for functions that have timed out and reply with success = false;
 
 }
 PropertyList WebSocketSource::supported()
@@ -474,6 +522,7 @@ void WebSocketSource::getPropertyAsync(AsyncPropertyReply *reply)
        //propertyReplyMap[reply->property] = reply;
        std::string uuid = amb::createUuid();
        uuidReplyMap[uuid] = reply;
+       uuidTimeoutMap[uuid] = amb::currentTime() + 10.0; ///TODO: 10 second timeout, make this configurable?
        stringstream s;  
        
        s << "{\"type\":\"method\",\"name\":\"get\",\"data\":[\"" << reply->property << "\"],\"transactionid\":\"" << uuid << "\"}";
@@ -492,6 +541,7 @@ void WebSocketSource::getRangePropertyAsync(AsyncRangePropertyReply *reply)
        ///TODO: fill in
        std::string uuid = amb::createUuid();
        uuidRangedReplyMap[uuid] = reply;
+       uuidTimeoutMap[uuid] = amb::currentTime() + 60; ///TODO: 60 second timeout, make this configurable?
        stringstream s;  
        s << "{\"type\":\"method\",\"name\":\"getRange\",\"data\": {";
        s << "\"timeBegin\":\"" << reply->timeBegin << "\",";
index 556ecee..0212dfc 100644 (file)
@@ -56,6 +56,7 @@ public:
        //map<VehicleProperty::Property,AsyncPropertyReply*> propertyReplyMap;
        //map<VehicleProperty::Property,AsyncRangePropertyReply*> rangedPropertyReplyMap;
        map<std::string,AsyncPropertyReply*> uuidReplyMap;
+       map<std::string,double> uuidTimeoutMap;
        map<std::string,AsyncRangePropertyReply*> uuidRangedReplyMap;