#include "base/atomic_sequence_num.h"
#include "base/bind.h"
#include "base/lazy_instance.h"
+#include "base/memory/singleton.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/message_loop/message_loop_proxy.h"
+#include "base/synchronization/lock.h"
+#include "base/synchronization/waitable_event.h"
#include "base/threading/thread.h"
-#include "base/time/tick_clock.h"
+#include "base/threading/thread_restrictions.h"
#include "base/time/time.h"
#include "mojo/common/message_pump_mojo.h"
#include "mojo/common/message_pump_mojo_handler.h"
+#include "mojo/common/time_helper.h"
namespace mojo {
namespace common {
return scoped_ptr<base::MessagePump>(message_pump_mojo).Pass();
}
+base::TimeTicks MojoDeadlineToTimeTicks(MojoDeadline deadline) {
+ return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
+ internal::NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
+}
+
// Tracks the data for a single call to Start().
struct WatchData {
WatchData()
: id(0),
- wait_flags(MOJO_WAIT_FLAG_NONE),
+ handle_signals(MOJO_HANDLE_SIGNAL_NONE),
message_loop(NULL) {}
WatcherID id;
Handle handle;
- MojoWaitFlags wait_flags;
+ MojoHandleSignals handle_signals;
base::TimeTicks deadline;
base::Callback<void(MojoResult)> callback;
scoped_refptr<base::MessageLoopProxy> message_loop;
virtual ~WatcherBackend();
void StartWatching(const WatchData& data);
- void StopWatching(WatcherID watcher_id);
+
+ // Cancels a previously schedule request to start a watch. When done signals
+ // |event|.
+ void StopWatching(WatcherID watcher_id, base::WaitableEvent* event);
private:
typedef std::map<Handle, WatchData> HandleToWatchDataMap;
handle_to_data_[data.handle] = data;
message_pump_mojo->AddHandler(this, data.handle,
- data.wait_flags,
+ data.handle_signals,
data.deadline);
}
-void WatcherBackend::StopWatching(WatcherID watcher_id) {
+void WatcherBackend::StopWatching(WatcherID watcher_id,
+ base::WaitableEvent* event) {
// Because of the thread hop it is entirely possible to get here and not
// have a valid handle registered for |watcher_id|.
Handle handle;
- if (!GetMojoHandleByWatcherID(watcher_id, &handle))
- return;
-
- handle_to_data_.erase(handle);
- message_pump_mojo->RemoveHandler(handle);
+ if (GetMojoHandleByWatcherID(watcher_id, &handle)) {
+ handle_to_data_.erase(handle);
+ message_pump_mojo->RemoveHandler(handle);
+ }
+ event->Signal();
}
void WatcherBackend::RemoveAndNotify(const Handle& handle,
// WatcherThreadManager manages the background thread that listens for handles
// to be ready. All requests are handled by WatcherBackend.
+} // namespace
+
class WatcherThreadManager {
public:
+ ~WatcherThreadManager();
+
// Returns the shared instance.
static WatcherThreadManager* GetInstance();
// on the thread StartWatching() was invoked on.
// This may be invoked on any thread.
WatcherID StartWatching(const Handle& handle,
- MojoWaitFlags wait_flags,
+ MojoHandleSignals handle_signals,
base::TimeTicks deadline,
const base::Callback<void(MojoResult)>& callback);
void StopWatching(WatcherID watcher_id);
private:
- friend struct base::DefaultLazyInstanceTraits<WatcherThreadManager>;
-
+ friend struct DefaultSingletonTraits<WatcherThreadManager>;
WatcherThreadManager();
- ~WatcherThreadManager();
base::Thread thread_;
DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
};
+WatcherThreadManager::~WatcherThreadManager() {
+ thread_.Stop();
+}
+
WatcherThreadManager* WatcherThreadManager::GetInstance() {
- static base::LazyInstance<WatcherThreadManager> instance =
- LAZY_INSTANCE_INITIALIZER;
- return &instance.Get();
+ return Singleton<WatcherThreadManager>::get();
}
WatcherID WatcherThreadManager::StartWatching(
const Handle& handle,
- MojoWaitFlags wait_flags,
+ MojoHandleSignals handle_signals,
base::TimeTicks deadline,
const base::Callback<void(MojoResult)>& callback) {
WatchData data;
data.id = watcher_id_generator_.GetNext();
data.handle = handle;
data.callback = callback;
- data.wait_flags = wait_flags;
+ data.handle_signals = handle_signals;
data.deadline = deadline;
data.message_loop = base::MessageLoopProxy::current();
- DCHECK_NE(static_cast<base::MessageLoopProxy*>(NULL), data.message_loop);
- // We outlive |thread_|, so it's safe to use Unretained() here.
+ DCHECK_NE(static_cast<base::MessageLoopProxy*>(NULL),
+ data.message_loop.get());
+ // We own |thread_|, so it's safe to use Unretained() here.
thread_.message_loop()->PostTask(
FROM_HERE,
base::Bind(&WatcherBackend::StartWatching,
}
void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
- // We outlive |thread_|, so it's safe to use Unretained() here.
+ base::ThreadRestrictions::ScopedAllowWait allow_wait;
+ base::WaitableEvent event(true, false);
+ // We own |thread_|, so it's safe to use Unretained() here.
thread_.message_loop()->PostTask(
FROM_HERE,
base::Bind(&WatcherBackend::StopWatching,
base::Unretained(&backend_),
- watcher_id));
+ watcher_id,
+ &event));
+
+ // We need to block until the handle is actually removed.
+ event.Wait();
}
WatcherThreadManager::WatcherThreadManager()
thread_.StartWithOptions(thread_options);
}
-WatcherThreadManager::~WatcherThreadManager() {
- thread_.Stop();
-}
+// HandleWatcher::State --------------------------------------------------------
-} // namespace
+// Represents the state of the HandleWatcher. Owns the user's callback and
+// monitors the current thread's MessageLoop to know when to force the callback
+// to run (with an error) even though the pipe hasn't been signaled yet.
+class HandleWatcher::State : public base::MessageLoop::DestructionObserver {
+ public:
+ State(HandleWatcher* watcher,
+ const Handle& handle,
+ MojoHandleSignals handle_signals,
+ MojoDeadline deadline,
+ const base::Callback<void(MojoResult)>& callback)
+ : watcher_(watcher),
+ callback_(callback),
+ weak_factory_(this) {
+ base::MessageLoop::current()->AddDestructionObserver(this);
+
+ watcher_id_ = WatcherThreadManager::GetInstance()->StartWatching(
+ handle,
+ handle_signals,
+ MojoDeadlineToTimeTicks(deadline),
+ base::Bind(&State::OnHandleReady, weak_factory_.GetWeakPtr()));
+ }
-// HandleWatcher::StartState ---------------------------------------------------
+ virtual ~State() {
+ base::MessageLoop::current()->RemoveDestructionObserver(this);
-// Contains the information passed to Start().
-struct HandleWatcher::StartState {
- explicit StartState(HandleWatcher* watcher) : weak_factory(watcher) {
+ WatcherThreadManager::GetInstance()->StopWatching(watcher_id_);
}
- ~StartState() {
+ private:
+ virtual void WillDestroyCurrentMessageLoop() OVERRIDE {
+ // The current thread is exiting. Simulate a watch error.
+ OnHandleReady(MOJO_RESULT_ABORTED);
}
- // ID assigned by WatcherThreadManager.
- WatcherID watcher_id;
+ void OnHandleReady(MojoResult result) {
+ base::Callback<void(MojoResult)> callback = callback_;
+ watcher_->Stop(); // Destroys |this|.
- // Callback to notify when done.
- base::Callback<void(MojoResult)> callback;
+ callback.Run(result);
+ }
+
+ HandleWatcher* watcher_;
+ WatcherID watcher_id_;
+ base::Callback<void(MojoResult)> callback_;
- // When Start() is invoked a callback is passed to WatcherThreadManager
- // using a WeakRef from |weak_refactory_|. The callback invokes
- // OnHandleReady() (on the thread Start() is invoked from) which in turn
- // notifies |callback_|. Doing this allows us to reset state when the handle
- // is ready, and then notify the callback. Doing this also means Stop()
- // cancels any pending callbacks that may be inflight.
- base::WeakPtrFactory<HandleWatcher> weak_factory;
+ // Used to weakly bind |this| to the WatcherThreadManager.
+ base::WeakPtrFactory<State> weak_factory_;
};
// HandleWatcher ---------------------------------------------------------------
-// static
-base::TickClock* HandleWatcher::tick_clock_ = NULL;
-
HandleWatcher::HandleWatcher() {
}
HandleWatcher::~HandleWatcher() {
- Stop();
}
void HandleWatcher::Start(const Handle& handle,
- MojoWaitFlags wait_flags,
+ MojoHandleSignals handle_signals,
MojoDeadline deadline,
const base::Callback<void(MojoResult)>& callback) {
DCHECK(handle.is_valid());
- DCHECK_NE(MOJO_WAIT_FLAG_NONE, wait_flags);
-
- Stop();
-
- start_state_.reset(new StartState(this));
- start_state_->callback = callback;
- start_state_->watcher_id =
- WatcherThreadManager::GetInstance()->StartWatching(
- handle,
- wait_flags,
- MojoDeadlineToTimeTicks(deadline),
- base::Bind(&HandleWatcher::OnHandleReady,
- start_state_->weak_factory.GetWeakPtr()));
-}
+ DCHECK_NE(MOJO_HANDLE_SIGNAL_NONE, handle_signals);
-void HandleWatcher::Stop() {
- if (!start_state_.get())
- return;
-
- scoped_ptr<StartState> old_state(start_state_.Pass());
- WatcherThreadManager::GetInstance()->StopWatching(old_state->watcher_id);
-}
-
-void HandleWatcher::OnHandleReady(MojoResult result) {
- DCHECK(start_state_.get());
- scoped_ptr<StartState> old_state(start_state_.Pass());
- old_state->callback.Run(result);
-
- // NOTE: We may have been deleted during callback execution.
-}
-
-// static
-base::TimeTicks HandleWatcher::NowTicks() {
- return tick_clock_ ? tick_clock_->NowTicks() : base::TimeTicks::Now();
+ state_.reset(new State(this, handle, handle_signals, deadline, callback));
}
-// static
-base::TimeTicks HandleWatcher::MojoDeadlineToTimeTicks(MojoDeadline deadline) {
- return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
- NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
+void HandleWatcher::Stop() {
+ state_.reset();
}
} // namespace common