3 * Copyright 2004--2005, Google Inc.
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * 1. Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright notice,
11 * this list of conditions and the following disclaimer in the documentation
12 * and/or other materials provided with the distribution.
13 * 3. The name of the author may not be used to endorse or promote products
14 * derived from this software without specific prior written permission.
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 #include "talk/base/common.h"
33 #include "talk/base/logging.h"
34 #include "talk/base/messagequeue.h"
35 #if defined(__native_client__)
36 #include "talk/base/nullsocketserver.h"
37 typedef talk_base::NullSocketServer DefaultSocketServer;
39 #include "talk/base/physicalsocketserver.h"
40 typedef talk_base::PhysicalSocketServer DefaultSocketServer;
45 const uint32 kMaxMsgLatency = 150; // 150 ms
47 //------------------------------------------------------------------
48 // MessageQueueManager
50 MessageQueueManager* MessageQueueManager::instance_ = NULL;
52 MessageQueueManager* MessageQueueManager::Instance() {
53 // Note: This is not thread safe, but it is first called before threads are
56 instance_ = new MessageQueueManager;
60 bool MessageQueueManager::IsInitialized() {
61 return instance_ != NULL;
64 MessageQueueManager::MessageQueueManager() {
67 MessageQueueManager::~MessageQueueManager() {
70 void MessageQueueManager::Add(MessageQueue *message_queue) {
71 return Instance()->AddInternal(message_queue);
73 void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
74 // MessageQueueManager methods should be non-reentrant, so we
75 // ASSERT that is the case. If any of these ASSERT, please
76 // contact bpm or jbeda.
77 ASSERT(!crit_.CurrentThreadIsOwner());
79 message_queues_.push_back(message_queue);
82 void MessageQueueManager::Remove(MessageQueue *message_queue) {
83 // If there isn't a message queue manager instance, then there isn't a queue
85 if (!instance_) return;
86 return Instance()->RemoveInternal(message_queue);
88 void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
89 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
90 // If this is the last MessageQueue, destroy the manager as well so that
91 // we don't leak this object at program shutdown. As mentioned above, this is
92 // not thread-safe, but this should only happen at program termination (when
93 // the ThreadManager is destroyed, and threads are no longer active).
97 std::vector<MessageQueue *>::iterator iter;
98 iter = std::find(message_queues_.begin(), message_queues_.end(),
100 if (iter != message_queues_.end()) {
101 message_queues_.erase(iter);
103 destroy = message_queues_.empty();
111 void MessageQueueManager::Clear(MessageHandler *handler) {
112 // If there isn't a message queue manager instance, then there aren't any
113 // queues to remove this handler from.
114 if (!instance_) return;
115 return Instance()->ClearInternal(handler);
117 void MessageQueueManager::ClearInternal(MessageHandler *handler) {
118 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above.
119 CritScope cs(&crit_);
120 std::vector<MessageQueue *>::iterator iter;
121 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
122 (*iter)->Clear(handler);
125 //------------------------------------------------------------------
128 MessageQueue::MessageQueue(SocketServer* ss)
129 : ss_(ss), fStop_(false), fPeekKeep_(false),
132 // Currently, MessageQueue holds a socket server, and is the base class for
133 // Thread. It seems like it makes more sense for Thread to hold the socket
134 // server, and provide it to the MessageQueue, since the Thread controls
135 // the I/O model, and MQ is agnostic to those details. Anyway, this causes
136 // messagequeue_unittest to depend on network libraries... yuck.
137 default_ss_.reset(new DefaultSocketServer());
138 ss_ = default_ss_.get();
140 ss_->SetMessageQueue(this);
141 MessageQueueManager::Add(this);
144 MessageQueue::~MessageQueue() {
145 // The signal is done from here to ensure
146 // that it always gets called when the queue
148 SignalQueueDestroyed();
149 MessageQueueManager::Remove(this);
152 ss_->SetMessageQueue(NULL);
156 void MessageQueue::set_socketserver(SocketServer* ss) {
157 ss_ = ss ? ss : default_ss_.get();
158 ss_->SetMessageQueue(this);
161 void MessageQueue::Quit() {
166 bool MessageQueue::IsQuitting() {
170 void MessageQueue::Restart() {
174 bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
179 if (!Get(pmsg, cmsWait))
186 bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
187 // Return and clear peek if present
188 // Always return the peek if it exists so there is Peek/Get symmetry
196 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
198 int cmsTotal = cmsWait;
200 uint32 msStart = Time();
201 uint32 msCurrent = msStart;
203 // Check for sent messages
206 // Check for posted events
207 int cmsDelayNext = kForever;
208 bool first_pass = true;
210 // All queue operations need to be locked, but nothing else in this loop
211 // (specifically handling disposed message) can happen inside the crit.
212 // Otherwise, disposed MessageHandlers will cause deadlocks.
214 CritScope cs(&crit_);
215 // On the first pass, check for delayed messages that have been
216 // triggered and calculate the next trigger time.
219 while (!dmsgq_.empty()) {
220 if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
221 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
224 msgq_.push_back(dmsgq_.top().msg_);
228 // Pull a message off the message queue, if available.
232 *pmsg = msgq_.front();
235 } // crit_ is released here.
237 // Log a warning for time-sensitive messages that we're late to deliver.
238 if (pmsg->ts_sensitive) {
239 int32 delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
241 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: "
242 << (delay + kMaxMsgLatency) << "ms";
245 // If this was a dispose message, delete it and skip it.
246 if (MQID_DISPOSE == pmsg->message_id) {
247 ASSERT(NULL == pmsg->phandler);
258 // Which is shorter, the delay wait or the asked wait?
261 if (cmsWait == kForever) {
262 cmsNext = cmsDelayNext;
264 cmsNext = _max(0, cmsTotal - cmsElapsed);
265 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
266 cmsNext = cmsDelayNext;
269 // Wait and multiplex in the meantime
270 if (!ss_->Wait(cmsNext, process_io))
273 // If the specified timeout expired, return
276 cmsElapsed = TimeDiff(msCurrent, msStart);
277 if (cmsWait != kForever) {
278 if (cmsElapsed >= cmsWait)
285 void MessageQueue::ReceiveSends() {
288 void MessageQueue::Post(MessageHandler *phandler, uint32 id,
289 MessageData *pdata, bool time_sensitive) {
294 // Add the message to the end of the queue
295 // Signal for the multiplexer to return
297 CritScope cs(&crit_);
299 msg.phandler = phandler;
302 if (time_sensitive) {
303 msg.ts_sensitive = Time() + kMaxMsgLatency;
305 msgq_.push_back(msg);
309 void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp,
310 MessageHandler *phandler, uint32 id, MessageData* pdata) {
315 // Add to the priority queue. Gets sorted soonest first.
316 // Signal for the multiplexer to return.
318 CritScope cs(&crit_);
320 msg.phandler = phandler;
323 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
325 // If this message queue processes 1 message every millisecond for 50 days,
326 // we will wrap this number. Even then, only messages with identical times
327 // will be misordered, and then only briefly. This is probably ok.
328 VERIFY(0 != ++dmsgq_next_num_);
332 int MessageQueue::GetDelay() {
333 CritScope cs(&crit_);
338 if (!dmsgq_.empty()) {
339 int delay = TimeUntil(dmsgq_.top().msTrigger_);
348 void MessageQueue::Clear(MessageHandler *phandler, uint32 id,
349 MessageList* removed) {
350 CritScope cs(&crit_);
352 // Remove messages with phandler
354 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
356 removed->push_back(msgPeek_);
358 delete msgPeek_.pdata;
363 // Remove from ordered message queue
365 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
366 if (it->Match(phandler, id)) {
368 removed->push_back(*it);
372 it = msgq_.erase(it);
378 // Remove from priority queue. Not directly iterable, so use this approach
380 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
381 for (PriorityQueue::container_type::iterator it = new_end;
382 it != dmsgq_.container().end(); ++it) {
383 if (it->msg_.Match(phandler, id)) {
385 removed->push_back(it->msg_);
387 delete it->msg_.pdata;
393 dmsgq_.container().erase(new_end, dmsgq_.container().end());
397 void MessageQueue::Dispatch(Message *pmsg) {
398 pmsg->phandler->OnMessage(pmsg);
401 } // namespace talk_base