#include "tools/android/forwarder2/forwarders_manager.h"
+#include <sys/select.h>
+#include <unistd.h>
+
#include <algorithm>
#include "base/basictypes.h"
#include "base/bind.h"
+#include "base/callback_helpers.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/message_loop/message_loop_proxy.h"
+#include "base/posix/eintr_wrapper.h"
#include "tools/android/forwarder2/forwarder.h"
#include "tools/android/forwarder2/socket.h"
namespace forwarder2 {
-ForwardersManager::ForwardersManager() : delegate_(new Delegate()) {}
+ForwardersManager::ForwardersManager() : thread_("ForwardersManagerThread") {
+ thread_.Start();
+ WaitForEventsOnInternalThreadSoon();
+}
+
ForwardersManager::~ForwardersManager() {
- delegate_->Clear();
+ deletion_notifier_.Notify();
}
void ForwardersManager::CreateAndStartNewForwarder(scoped_ptr<Socket> socket1,
scoped_ptr<Socket> socket2) {
- delegate_->CreateAndStartNewForwarder(socket1.Pass(), socket2.Pass());
+ // Note that the internal Forwarder vector is populated on the internal thread
+ // which is the only thread from which it's accessed.
+ thread_.message_loop_proxy()->PostTask(
+ FROM_HERE,
+ base::Bind(&ForwardersManager::CreateNewForwarderOnInternalThread,
+ base::Unretained(this), base::Passed(&socket1),
+ base::Passed(&socket2)));
+
+ // Guarantees that the CreateNewForwarderOnInternalThread callback posted to
+ // the internal thread gets executed immediately.
+ wakeup_notifier_.Notify();
}
-ForwardersManager::Delegate::Delegate() {}
+void ForwardersManager::CreateNewForwarderOnInternalThread(
+ scoped_ptr<Socket> socket1,
+ scoped_ptr<Socket> socket2) {
+ DCHECK(thread_.message_loop_proxy()->RunsTasksOnCurrentThread());
+ forwarders_.push_back(new Forwarder(socket1.Pass(), socket2.Pass()));
+}
-ForwardersManager::Delegate::~Delegate() {
- // The forwarder instances should already have been deleted on their
- // construction thread. Deleting them here would be unsafe since we don't know
- // which thread this destructor is called on.
- DCHECK(forwarders_.empty());
+void ForwardersManager::WaitForEventsOnInternalThreadSoon() {
+ thread_.message_loop_proxy()->PostTask(
+ FROM_HERE,
+ base::Bind(&ForwardersManager::WaitForEventsOnInternalThread,
+ base::Unretained(this)));
}
-void ForwardersManager::Delegate::Clear() {
- if (!forwarders_constructor_runner_) {
- DCHECK(forwarders_.empty());
- return;
+void ForwardersManager::WaitForEventsOnInternalThread() {
+ DCHECK(thread_.message_loop_proxy()->RunsTasksOnCurrentThread());
+ fd_set read_fds;
+ fd_set write_fds;
+
+ FD_ZERO(&read_fds);
+ FD_ZERO(&write_fds);
+
+ // Populate the file descriptor sets.
+ int max_fd = -1;
+ for (ScopedVector<Forwarder>::iterator it = forwarders_.begin();
+ it != forwarders_.end(); ++it) {
+ Forwarder* const forwarder = *it;
+ forwarder->RegisterFDs(&read_fds, &write_fds, &max_fd);
+ }
+
+ const int notifier_fds[] = {
+ wakeup_notifier_.receiver_fd(),
+ deletion_notifier_.receiver_fd(),
+ };
+
+ for (int i = 0; i < arraysize(notifier_fds); ++i) {
+ const int notifier_fd = notifier_fds[i];
+ DCHECK_GT(notifier_fd, -1);
+ FD_SET(notifier_fd, &read_fds);
+ max_fd = std::max(max_fd, notifier_fd);
}
- if (forwarders_constructor_runner_->RunsTasksOnCurrentThread()) {
- ClearOnForwarderConstructorThread();
+
+ const int ret = HANDLE_EINTR(
+ select(max_fd + 1, &read_fds, &write_fds, NULL, NULL));
+ if (ret < 0) {
+ PLOG(ERROR) << "select";
return;
}
- forwarders_constructor_runner_->PostTask(
- FROM_HERE,
- base::Bind(
- &ForwardersManager::Delegate::ClearOnForwarderConstructorThread,
- this));
-}
-void ForwardersManager::Delegate::CreateAndStartNewForwarder(
- scoped_ptr<Socket> socket1,
- scoped_ptr<Socket> socket2) {
- const scoped_refptr<base::SingleThreadTaskRunner> current_task_runner(
- base::MessageLoopProxy::current());
- DCHECK(current_task_runner);
- if (forwarders_constructor_runner_) {
- DCHECK_EQ(current_task_runner, forwarders_constructor_runner_);
- } else {
- forwarders_constructor_runner_ = current_task_runner;
+ const bool must_shutdown = FD_ISSET(
+ deletion_notifier_.receiver_fd(), &read_fds);
+ if (must_shutdown && forwarders_.empty())
+ return;
+
+ base::ScopedClosureRunner wait_for_events_soon(
+ base::Bind(&ForwardersManager::WaitForEventsOnInternalThreadSoon,
+ base::Unretained(this)));
+
+ if (FD_ISSET(wakeup_notifier_.receiver_fd(), &read_fds)) {
+ // Note that the events on FDs other than the wakeup notifier one, if any,
+ // will be processed upon the next select().
+ wakeup_notifier_.Reset();
+ return;
}
- forwarders_.push_back(
- new Forwarder(socket1.Pass(), socket2.Pass(),
- &deletion_notifier_,
- base::Bind(&ForwardersManager::Delegate::OnForwarderError,
- this)));
- forwarders_.back()->Start();
-}
-void ForwardersManager::Delegate::OnForwarderError(
- scoped_ptr<Forwarder> forwarder) {
- DCHECK(forwarders_constructor_runner_->RunsTasksOnCurrentThread());
- const ScopedVector<Forwarder>::iterator it = std::find(
- forwarders_.begin(), forwarders_.end(), forwarder.get());
- DCHECK(it != forwarders_.end());
- std::swap(*it, forwarders_.back());
- forwarders_.pop_back();
- ignore_result(forwarder.release()); // Deleted by the pop_back() above.
-}
+ // Notify the Forwarder instances and remove the ones that are closed.
+ for (size_t i = 0; i < forwarders_.size(); ) {
+ Forwarder* const forwarder = forwarders_[i];
+ forwarder->ProcessEvents(read_fds, write_fds);
-void ForwardersManager::Delegate::ClearOnForwarderConstructorThread() {
- DCHECK(forwarders_constructor_runner_->RunsTasksOnCurrentThread());
- deletion_notifier_.Notify();
- forwarders_.clear();
+ if (must_shutdown)
+ forwarder->Shutdown();
+
+ if (!forwarder->IsClosed()) {
+ ++i;
+ continue;
+ }
+
+ std::swap(forwarders_[i], forwarders_.back());
+ forwarders_.pop_back();
+ }
}
} // namespace forwarder2