Upstream version 10.39.225.0
[platform/framework/web/crosswalk.git] / src / jingle / glue / thread_wrapper.cc
1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
4
5 #include "jingle/glue/thread_wrapper.h"
6
7 #include "base/bind.h"
8 #include "base/bind_helpers.h"
9 #include "base/lazy_instance.h"
10 #include "base/threading/thread_local.h"
11 #include "third_party/webrtc/base/nullsocketserver.h"
12
13 namespace jingle_glue {
14
15 struct JingleThreadWrapper::PendingSend {
16   PendingSend(const rtc::Message& message_value)
17       : sending_thread(JingleThreadWrapper::current()),
18         message(message_value),
19         done_event(true, false) {
20     DCHECK(sending_thread);
21   }
22
23   JingleThreadWrapper* sending_thread;
24   rtc::Message message;
25   base::WaitableEvent done_event;
26 };
27
28 base::LazyInstance<base::ThreadLocalPointer<JingleThreadWrapper> >
29     g_jingle_thread_wrapper = LAZY_INSTANCE_INITIALIZER;
30
31 // static
32 void JingleThreadWrapper::EnsureForCurrentMessageLoop() {
33   if (JingleThreadWrapper::current() == NULL) {
34     base::MessageLoop* message_loop = base::MessageLoop::current();
35     g_jingle_thread_wrapper.Get()
36         .Set(new JingleThreadWrapper(message_loop->message_loop_proxy()));
37     message_loop->AddDestructionObserver(current());
38   }
39
40   DCHECK_EQ(rtc::Thread::Current(), current());
41 }
42
43 // static
44 JingleThreadWrapper* JingleThreadWrapper::current() {
45   return g_jingle_thread_wrapper.Get().Get();
46 }
47
48 JingleThreadWrapper::JingleThreadWrapper(
49     scoped_refptr<base::SingleThreadTaskRunner> task_runner)
50     : rtc::Thread(new rtc::NullSocketServer()),
51       task_runner_(task_runner),
52       send_allowed_(false),
53       last_task_id_(0),
54       pending_send_event_(true, false),
55       weak_ptr_factory_(this) {
56   DCHECK(task_runner->BelongsToCurrentThread());
57   DCHECK(!rtc::Thread::Current());
58   weak_ptr_ = weak_ptr_factory_.GetWeakPtr();
59   rtc::MessageQueueManager::Add(this);
60   SafeWrapCurrent();
61 }
62
63 JingleThreadWrapper::~JingleThreadWrapper() {
64   Clear(NULL, rtc::MQID_ANY, NULL);
65 }
66
67 void JingleThreadWrapper::WillDestroyCurrentMessageLoop() {
68   DCHECK_EQ(rtc::Thread::Current(), current());
69   UnwrapCurrent();
70   g_jingle_thread_wrapper.Get().Set(NULL);
71   rtc::ThreadManager::Instance()->SetCurrentThread(NULL);
72   rtc::MessageQueueManager::Remove(this);
73   rtc::SocketServer* ss = socketserver();
74   delete this;
75   delete ss;
76 }
77
78 void JingleThreadWrapper::Post(
79     rtc::MessageHandler* handler, uint32 message_id,
80     rtc::MessageData* data, bool time_sensitive) {
81   PostTaskInternal(0, handler, message_id, data);
82 }
83
84 void JingleThreadWrapper::PostDelayed(
85     int delay_ms, rtc::MessageHandler* handler,
86     uint32 message_id, rtc::MessageData* data) {
87   PostTaskInternal(delay_ms, handler, message_id, data);
88 }
89
90 void JingleThreadWrapper::Clear(rtc::MessageHandler* handler, uint32 id,
91                                 rtc::MessageList* removed) {
92   base::AutoLock auto_lock(lock_);
93
94   for (MessagesQueue::iterator it = messages_.begin();
95        it != messages_.end();) {
96     MessagesQueue::iterator next = it;
97     ++next;
98
99     if (it->second.Match(handler, id)) {
100       if (removed) {
101         removed->push_back(it->second);
102       } else {
103         delete it->second.pdata;
104       }
105       messages_.erase(it);
106     }
107
108     it = next;
109   }
110
111   for (std::list<PendingSend*>::iterator it = pending_send_messages_.begin();
112        it != pending_send_messages_.end();) {
113     std::list<PendingSend*>::iterator next = it;
114     ++next;
115
116     if ((*it)->message.Match(handler, id)) {
117       if (removed) {
118         removed ->push_back((*it)->message);
119       } else {
120         delete (*it)->message.pdata;
121       }
122       (*it)->done_event.Signal();
123       pending_send_messages_.erase(it);
124     }
125
126     it = next;
127   }
128 }
129
130 void JingleThreadWrapper::Send(rtc::MessageHandler *handler, uint32 id,
131                                rtc::MessageData *data) {
132   if (fStop_)
133     return;
134
135   JingleThreadWrapper* current_thread = JingleThreadWrapper::current();
136   DCHECK(current_thread != NULL) << "Send() can be called only from a "
137       "thread that has JingleThreadWrapper.";
138
139   rtc::Message message;
140   message.phandler = handler;
141   message.message_id = id;
142   message.pdata = data;
143
144   if (current_thread == this) {
145     handler->OnMessage(&message);
146     return;
147   }
148
149   // Send message from a thread different than |this|.
150
151   // Allow inter-thread send only from threads that have
152   // |send_allowed_| flag set.
153   DCHECK(current_thread->send_allowed_) << "Send()'ing synchronous "
154       "messages is not allowed from the current thread.";
155
156   PendingSend pending_send(message);
157   {
158     base::AutoLock auto_lock(lock_);
159     pending_send_messages_.push_back(&pending_send);
160   }
161
162   // Need to signal |pending_send_event_| here in case the thread is
163   // sending message to another thread.
164   pending_send_event_.Signal();
165   task_runner_->PostTask(FROM_HERE,
166                          base::Bind(&JingleThreadWrapper::ProcessPendingSends,
167                                     weak_ptr_));
168
169
170   while (!pending_send.done_event.IsSignaled()) {
171     base::WaitableEvent* events[] = {&pending_send.done_event,
172                                      &current_thread->pending_send_event_};
173     size_t event = base::WaitableEvent::WaitMany(events, arraysize(events));
174     DCHECK(event == 0 || event == 1);
175
176     if (event == 1)
177       current_thread->ProcessPendingSends();
178   }
179 }
180
181 void JingleThreadWrapper::ProcessPendingSends() {
182   while (true) {
183     PendingSend* pending_send = NULL;
184     {
185       base::AutoLock auto_lock(lock_);
186       if (!pending_send_messages_.empty()) {
187         pending_send = pending_send_messages_.front();
188         pending_send_messages_.pop_front();
189       } else {
190         // Reset the event while |lock_| is still locked.
191         pending_send_event_.Reset();
192         break;
193       }
194     }
195     if (pending_send) {
196       pending_send->message.phandler->OnMessage(&pending_send->message);
197       pending_send->done_event.Signal();
198     }
199   }
200 }
201
202 void JingleThreadWrapper::PostTaskInternal(
203     int delay_ms, rtc::MessageHandler* handler,
204     uint32 message_id, rtc::MessageData* data) {
205   int task_id;
206   rtc::Message message;
207   message.phandler = handler;
208   message.message_id = message_id;
209   message.pdata = data;
210   {
211     base::AutoLock auto_lock(lock_);
212     task_id = ++last_task_id_;
213     messages_.insert(std::pair<int, rtc::Message>(task_id, message));
214   }
215
216   if (delay_ms <= 0) {
217     task_runner_->PostTask(FROM_HERE,
218                            base::Bind(&JingleThreadWrapper::RunTask,
219                                       weak_ptr_, task_id));
220   } else {
221     task_runner_->PostDelayedTask(FROM_HERE,
222                                   base::Bind(&JingleThreadWrapper::RunTask,
223                                              weak_ptr_, task_id),
224                                   base::TimeDelta::FromMilliseconds(delay_ms));
225   }
226 }
227
228 void JingleThreadWrapper::RunTask(int task_id) {
229   bool have_message = false;
230   rtc::Message message;
231   {
232     base::AutoLock auto_lock(lock_);
233     MessagesQueue::iterator it = messages_.find(task_id);
234     if (it != messages_.end()) {
235       have_message = true;
236       message = it->second;
237       messages_.erase(it);
238     }
239   }
240
241   if (have_message) {
242     if (message.message_id == rtc::MQID_DISPOSE) {
243       DCHECK(message.phandler == NULL);
244       delete message.pdata;
245     } else {
246       message.phandler->OnMessage(&message);
247     }
248   }
249 }
250
251 // All methods below are marked as not reached. See comments in the
252 // header for more details.
253 void JingleThreadWrapper::Quit() {
254   NOTREACHED();
255 }
256
257 bool JingleThreadWrapper::IsQuitting() {
258   NOTREACHED();
259   return false;
260 }
261
262 void JingleThreadWrapper::Restart() {
263   NOTREACHED();
264 }
265
266 bool JingleThreadWrapper::Get(rtc::Message*, int, bool) {
267   NOTREACHED();
268   return false;
269 }
270
271 bool JingleThreadWrapper::Peek(rtc::Message*, int) {
272   NOTREACHED();
273   return false;
274 }
275
276 void JingleThreadWrapper::PostAt(uint32, rtc::MessageHandler*,
277                                  uint32, rtc::MessageData*) {
278   NOTREACHED();
279 }
280
281 void JingleThreadWrapper::Dispatch(rtc::Message* message) {
282   NOTREACHED();
283 }
284
285 void JingleThreadWrapper::ReceiveSends() {
286   NOTREACHED();
287 }
288
289 int JingleThreadWrapper::GetDelay() {
290   NOTREACHED();
291   return 0;
292 }
293
294 void JingleThreadWrapper::Stop() {
295   NOTREACHED();
296 }
297
298 void JingleThreadWrapper::Run() {
299   NOTREACHED();
300 }
301
302 }  // namespace jingle_glue