- add third_party src.
[platform/framework/web/crosswalk.git] / src / third_party / libjingle / source / talk / base / messagequeue.cc
1 /*
2  * libjingle
3  * Copyright 2004--2005, Google Inc.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  *
8  *  1. Redistributions of source code must retain the above copyright notice,
9  *     this list of conditions and the following disclaimer.
10  *  2. Redistributions in binary form must reproduce the above copyright notice,
11  *     this list of conditions and the following disclaimer in the documentation
12  *     and/or other materials provided with the distribution.
13  *  3. The name of the author may not be used to endorse or promote products
14  *     derived from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26  */
27
28 #ifdef POSIX
29 #include <sys/time.h>
30 #endif
31
32 #include "talk/base/common.h"
33 #include "talk/base/logging.h"
34 #include "talk/base/messagequeue.h"
35 #include "talk/base/physicalsocketserver.h"
36
37
38 namespace talk_base {
39
40 const uint32 kMaxMsgLatency = 150;  // 150 ms
41
42 //------------------------------------------------------------------
43 // MessageQueueManager
44
45 MessageQueueManager* MessageQueueManager::instance_ = NULL;
46
47 MessageQueueManager* MessageQueueManager::Instance() {
48   // Note: This is not thread safe, but it is first called before threads are
49   // spawned.
50   if (!instance_)
51     instance_ = new MessageQueueManager;
52   return instance_;
53 }
54
55 bool MessageQueueManager::IsInitialized() {
56   return instance_ != NULL;
57 }
58
59 MessageQueueManager::MessageQueueManager() {
60 }
61
62 MessageQueueManager::~MessageQueueManager() {
63 }
64
65 void MessageQueueManager::Add(MessageQueue *message_queue) {
66   return Instance()->AddInternal(message_queue);
67 }
68 void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
69   // MessageQueueManager methods should be non-reentrant, so we
70   // ASSERT that is the case.  If any of these ASSERT, please
71   // contact bpm or jbeda.
72   ASSERT(!crit_.CurrentThreadIsOwner());
73   CritScope cs(&crit_);
74   message_queues_.push_back(message_queue);
75 }
76
77 void MessageQueueManager::Remove(MessageQueue *message_queue) {
78   // If there isn't a message queue manager instance, then there isn't a queue
79   // to remove.
80   if (!instance_) return;
81   return Instance()->RemoveInternal(message_queue);
82 }
83 void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
84   ASSERT(!crit_.CurrentThreadIsOwner());  // See note above.
85   // If this is the last MessageQueue, destroy the manager as well so that
86   // we don't leak this object at program shutdown. As mentioned above, this is
87   // not thread-safe, but this should only happen at program termination (when
88   // the ThreadManager is destroyed, and threads are no longer active).
89   bool destroy = false;
90   {
91     CritScope cs(&crit_);
92     std::vector<MessageQueue *>::iterator iter;
93     iter = std::find(message_queues_.begin(), message_queues_.end(),
94                      message_queue);
95     if (iter != message_queues_.end()) {
96       message_queues_.erase(iter);
97     }
98     destroy = message_queues_.empty();
99   }
100   if (destroy) {
101     instance_ = NULL;
102     delete this;
103   }
104 }
105
106 void MessageQueueManager::Clear(MessageHandler *handler) {
107   // If there isn't a message queue manager instance, then there aren't any
108   // queues to remove this handler from.
109   if (!instance_) return;
110   return Instance()->ClearInternal(handler);
111 }
112 void MessageQueueManager::ClearInternal(MessageHandler *handler) {
113   ASSERT(!crit_.CurrentThreadIsOwner());  // See note above.
114   CritScope cs(&crit_);
115   std::vector<MessageQueue *>::iterator iter;
116   for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
117     (*iter)->Clear(handler);
118 }
119
120 //------------------------------------------------------------------
121 // MessageQueue
122
123 MessageQueue::MessageQueue(SocketServer* ss)
124     : ss_(ss), fStop_(false), fPeekKeep_(false), active_(false),
125       dmsgq_next_num_(0) {
126   if (!ss_) {
127     // Currently, MessageQueue holds a socket server, and is the base class for
128     // Thread.  It seems like it makes more sense for Thread to hold the socket
129     // server, and provide it to the MessageQueue, since the Thread controls
130     // the I/O model, and MQ is agnostic to those details.  Anyway, this causes
131     // messagequeue_unittest to depend on network libraries... yuck.
132     default_ss_.reset(new PhysicalSocketServer());
133     ss_ = default_ss_.get();
134   }
135   ss_->SetMessageQueue(this);
136 }
137
138 MessageQueue::~MessageQueue() {
139   // The signal is done from here to ensure
140   // that it always gets called when the queue
141   // is going away.
142   SignalQueueDestroyed();
143   if (active_) {
144     MessageQueueManager::Remove(this);
145     Clear(NULL);
146   }
147   if (ss_) {
148     ss_->SetMessageQueue(NULL);
149   }
150 }
151
152 void MessageQueue::set_socketserver(SocketServer* ss) {
153   ss_ = ss ? ss : default_ss_.get();
154   ss_->SetMessageQueue(this);
155 }
156
157 void MessageQueue::Quit() {
158   fStop_ = true;
159   ss_->WakeUp();
160 }
161
162 bool MessageQueue::IsQuitting() {
163   return fStop_;
164 }
165
166 void MessageQueue::Restart() {
167   fStop_ = false;
168 }
169
170 bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
171   if (fPeekKeep_) {
172     *pmsg = msgPeek_;
173     return true;
174   }
175   if (!Get(pmsg, cmsWait))
176     return false;
177   msgPeek_ = *pmsg;
178   fPeekKeep_ = true;
179   return true;
180 }
181
182 bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
183   // Return and clear peek if present
184   // Always return the peek if it exists so there is Peek/Get symmetry
185
186   if (fPeekKeep_) {
187     *pmsg = msgPeek_;
188     fPeekKeep_ = false;
189     return true;
190   }
191
192   // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
193
194   int cmsTotal = cmsWait;
195   int cmsElapsed = 0;
196   uint32 msStart = Time();
197   uint32 msCurrent = msStart;
198   while (true) {
199     // Check for sent messages
200     ReceiveSends();
201
202     // Check for posted events
203     int cmsDelayNext = kForever;
204     bool first_pass = true;
205     while (true) {
206       // All queue operations need to be locked, but nothing else in this loop
207       // (specifically handling disposed message) can happen inside the crit.
208       // Otherwise, disposed MessageHandlers will cause deadlocks.
209       {
210         CritScope cs(&crit_);
211         // On the first pass, check for delayed messages that have been
212         // triggered and calculate the next trigger time.
213         if (first_pass) {
214           first_pass = false;
215           while (!dmsgq_.empty()) {
216             if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
217               cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
218               break;
219             }
220             msgq_.push_back(dmsgq_.top().msg_);
221             dmsgq_.pop();
222           }
223         }
224         // Pull a message off the message queue, if available.
225         if (msgq_.empty()) {
226           break;
227         } else {
228           *pmsg = msgq_.front();
229           msgq_.pop_front();
230         }
231       }  // crit_ is released here.
232
233       // Log a warning for time-sensitive messages that we're late to deliver.
234       if (pmsg->ts_sensitive) {
235         int32 delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
236         if (delay > 0) {
237           LOG_F(LS_WARNING) << "id: " << pmsg->message_id << "  delay: "
238                             << (delay + kMaxMsgLatency) << "ms";
239         }
240       }
241       // If this was a dispose message, delete it and skip it.
242       if (MQID_DISPOSE == pmsg->message_id) {
243         ASSERT(NULL == pmsg->phandler);
244         delete pmsg->pdata;
245         *pmsg = Message();
246         continue;
247       }
248       return true;
249     }
250
251     if (fStop_)
252       break;
253
254     // Which is shorter, the delay wait or the asked wait?
255
256     int cmsNext;
257     if (cmsWait == kForever) {
258       cmsNext = cmsDelayNext;
259     } else {
260       cmsNext = _max(0, cmsTotal - cmsElapsed);
261       if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
262         cmsNext = cmsDelayNext;
263     }
264
265     // Wait and multiplex in the meantime
266     if (!ss_->Wait(cmsNext, process_io))
267       return false;
268
269     // If the specified timeout expired, return
270
271     msCurrent = Time();
272     cmsElapsed = TimeDiff(msCurrent, msStart);
273     if (cmsWait != kForever) {
274       if (cmsElapsed >= cmsWait)
275         return false;
276     }
277   }
278   return false;
279 }
280
281 void MessageQueue::ReceiveSends() {
282 }
283
284 void MessageQueue::Post(MessageHandler *phandler, uint32 id,
285     MessageData *pdata, bool time_sensitive) {
286   if (fStop_)
287     return;
288
289   // Keep thread safe
290   // Add the message to the end of the queue
291   // Signal for the multiplexer to return
292
293   CritScope cs(&crit_);
294   EnsureActive();
295   Message msg;
296   msg.phandler = phandler;
297   msg.message_id = id;
298   msg.pdata = pdata;
299   if (time_sensitive) {
300     msg.ts_sensitive = Time() + kMaxMsgLatency;
301   }
302   msgq_.push_back(msg);
303   ss_->WakeUp();
304 }
305
306 void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp,
307     MessageHandler *phandler, uint32 id, MessageData* pdata) {
308   if (fStop_)
309     return;
310
311   // Keep thread safe
312   // Add to the priority queue. Gets sorted soonest first.
313   // Signal for the multiplexer to return.
314
315   CritScope cs(&crit_);
316   EnsureActive();
317   Message msg;
318   msg.phandler = phandler;
319   msg.message_id = id;
320   msg.pdata = pdata;
321   DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
322   dmsgq_.push(dmsg);
323   // If this message queue processes 1 message every millisecond for 50 days,
324   // we will wrap this number.  Even then, only messages with identical times
325   // will be misordered, and then only briefly.  This is probably ok.
326   VERIFY(0 != ++dmsgq_next_num_);
327   ss_->WakeUp();
328 }
329
330 int MessageQueue::GetDelay() {
331   CritScope cs(&crit_);
332
333   if (!msgq_.empty())
334     return 0;
335
336   if (!dmsgq_.empty()) {
337     int delay = TimeUntil(dmsgq_.top().msTrigger_);
338     if (delay < 0)
339       delay = 0;
340     return delay;
341   }
342
343   return kForever;
344 }
345
346 void MessageQueue::Clear(MessageHandler *phandler, uint32 id,
347                          MessageList* removed) {
348   CritScope cs(&crit_);
349
350   // Remove messages with phandler
351
352   if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
353     if (removed) {
354       removed->push_back(msgPeek_);
355     } else {
356       delete msgPeek_.pdata;
357     }
358     fPeekKeep_ = false;
359   }
360
361   // Remove from ordered message queue
362
363   for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
364     if (it->Match(phandler, id)) {
365       if (removed) {
366         removed->push_back(*it);
367       } else {
368         delete it->pdata;
369       }
370       it = msgq_.erase(it);
371     } else {
372       ++it;
373     }
374   }
375
376   // Remove from priority queue. Not directly iterable, so use this approach
377
378   PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
379   for (PriorityQueue::container_type::iterator it = new_end;
380        it != dmsgq_.container().end(); ++it) {
381     if (it->msg_.Match(phandler, id)) {
382       if (removed) {
383         removed->push_back(it->msg_);
384       } else {
385         delete it->msg_.pdata;
386       }
387     } else {
388       *new_end++ = *it;
389     }
390   }
391   dmsgq_.container().erase(new_end, dmsgq_.container().end());
392   dmsgq_.reheap();
393 }
394
395 void MessageQueue::Dispatch(Message *pmsg) {
396   pmsg->phandler->OnMessage(pmsg);
397 }
398
399 void MessageQueue::EnsureActive() {
400   ASSERT(crit_.CurrentThreadIsOwner());
401   if (!active_) {
402     active_ = true;
403     MessageQueueManager::Add(this);
404   }
405 }
406
407 }  // namespace talk_base