Thread* result = CurrentThread();
if (NULL == result) {
result = new Thread();
- result->WrapCurrentWithThreadManager(this);
+ result->WrapCurrentWithThreadManager(this, true);
}
return result;
}
bool Thread::SetPriority(ThreadPriority priority) {
#if defined(WEBRTC_WIN)
if (running()) {
+ ASSERT(thread_ != NULL);
BOOL ret = FALSE;
if (priority == PRIORITY_NORMAL) {
ret = ::SetThreadPriority(thread_, THREAD_PRIORITY_NORMAL);
return true;
}
+bool Thread::WrapCurrent() {
+ return WrapCurrentWithThreadManager(ThreadManager::Instance(), true);
+}
+
+void Thread::UnwrapCurrent() {
+ // Clears the platform-specific thread-specific storage.
+ ThreadManager::Instance()->SetCurrentThread(NULL);
+#if defined(WEBRTC_WIN)
+ if (thread_ != NULL) {
+ if (!CloseHandle(thread_)) {
+ LOG_GLE(LS_ERROR) << "When unwrapping thread, failed to close handle.";
+ }
+ thread_ = NULL;
+ }
+#endif
+ running_.Reset();
+}
+
+void Thread::SafeWrapCurrent() {
+ WrapCurrentWithThreadManager(ThreadManager::Instance(), false);
+}
+
void Thread::Join() {
AssertBlockingIsAllowedOnCurrentThread();
if (running()) {
ASSERT(!IsCurrent());
#if defined(WEBRTC_WIN)
+ ASSERT(thread_ != NULL);
WaitForSingleObject(thread_, INFINITE);
CloseHandle(thread_);
thread_ = NULL;
}
void Thread::Send(MessageHandler *phandler, uint32 id, MessageData *pdata) {
- AssertBlockingIsAllowedOnCurrentThread();
-
if (fStop_)
return;
// Sent messages are sent to the MessageHandler directly, in the context
// of "thread", like Win32 SendMessage. If in the right context,
// call the handler directly.
-
Message msg;
msg.phandler = phandler;
msg.message_id = id;
return;
}
+ AssertBlockingIsAllowedOnCurrentThread();
+
AutoThread thread;
Thread *current_thread = Thread::Current();
ASSERT(current_thread != NULL); // AutoThread ensures this
crit_.Enter();
while (!ready) {
crit_.Leave();
- current_thread->ReceiveSends();
+ // We need to limit "ReceiveSends" to |this| thread to avoid an arbitrary
+ // thread invoking calls on the current thread.
+ current_thread->ReceiveSendsFromThread(this);
current_thread->socketserver()->Wait(kForever, false);
waited = true;
crit_.Enter();
}
void Thread::ReceiveSends() {
+ ReceiveSendsFromThread(NULL);
+}
+
+void Thread::ReceiveSendsFromThread(const Thread* source) {
// Receive a sent message. Cleanup scenarios:
// - thread sending exits: We don't allow this, since thread can exit
// only via Join, so Send must complete.
// - thread receiving exits: Wakeup/set ready in Thread::Clear()
// - object target cleared: Wakeup/set ready in Thread::Clear()
+ _SendMessage smsg;
+
crit_.Enter();
- while (!sendlist_.empty()) {
- _SendMessage smsg = sendlist_.front();
- sendlist_.pop_front();
+ while (PopSendMessageFromThread(source, &smsg)) {
crit_.Leave();
+
smsg.msg.phandler->OnMessage(&smsg.msg);
+
crit_.Enter();
*smsg.ready = true;
smsg.thread->socketserver()->WakeUp();
crit_.Leave();
}
+bool Thread::PopSendMessageFromThread(const Thread* source, _SendMessage* msg) {
+ for (std::list<_SendMessage>::iterator it = sendlist_.begin();
+ it != sendlist_.end(); ++it) {
+ if (it->thread == source || source == NULL) {
+ *msg = *it;
+ sendlist_.erase(it);
+ return true;
+ }
+ }
+ return false;
+}
+
void Thread::Clear(MessageHandler *phandler, uint32 id,
MessageList* removed) {
CritScope cs(&crit_);
}
}
-bool Thread::WrapCurrent() {
- return WrapCurrentWithThreadManager(ThreadManager::Instance());
-}
-
-bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager) {
+bool Thread::WrapCurrentWithThreadManager(ThreadManager* thread_manager,
+ bool need_synchronize_access) {
if (running())
return false;
+
#if defined(WEBRTC_WIN)
- // We explicitly ask for no rights other than synchronization.
- // This gives us the best chance of succeeding.
- thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId());
- if (!thread_) {
- LOG_GLE(LS_ERROR) << "Unable to get handle to thread.";
- return false;
+ if (need_synchronize_access) {
+ // We explicitly ask for no rights other than synchronization.
+ // This gives us the best chance of succeeding.
+ thread_ = OpenThread(SYNCHRONIZE, FALSE, GetCurrentThreadId());
+ if (!thread_) {
+ LOG_GLE(LS_ERROR) << "Unable to get handle to thread.";
+ return false;
+ }
+ thread_id_ = GetCurrentThreadId();
}
- thread_id_ = GetCurrentThreadId();
#elif defined(WEBRTC_POSIX)
thread_ = pthread_self();
#endif
+
owned_ = false;
running_.Set();
thread_manager->SetCurrentThread(this);
return true;
}
-void Thread::UnwrapCurrent() {
- // Clears the platform-specific thread-specific storage.
- ThreadManager::Instance()->SetCurrentThread(NULL);
-#if defined(WEBRTC_WIN)
- if (!CloseHandle(thread_)) {
- LOG_GLE(LS_ERROR) << "When unwrapping thread, failed to close handle.";
- }
-#endif
- running_.Reset();
-}
-
-
AutoThread::AutoThread(SocketServer* ss) : Thread(ss) {
if (!ThreadManager::Instance()->CurrentThread()) {
ThreadManager::Instance()->SetCurrentThread(this);