1 // Copyright (c) 2011 The Chromium Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file.
5 #include "ipc/ipc_channel_win.h"
9 #include "base/auto_reset.h"
10 #include "base/compiler_specific.h"
11 #include "base/logging.h"
12 #include "base/threading/non_thread_safe.h"
13 #include "base/utf_string_conversions.h"
14 #include "base/win/scoped_handle.h"
15 #include "ipc/ipc_logging.h"
16 #include "ipc/ipc_message_utils.h"
20 Channel::ChannelImpl::State::State(ChannelImpl* channel) : is_pending(false) {
21 memset(&context.overlapped, 0, sizeof(context.overlapped));
22 context.handler = channel;
25 Channel::ChannelImpl::State::~State() {
26 COMPILE_ASSERT(!offsetof(Channel::ChannelImpl::State, context),
27 starts_with_io_context);
30 Channel::ChannelImpl::ChannelImpl(const IPC::ChannelHandle &channel_handle,
31 Mode mode, Listener* listener)
32 : ALLOW_THIS_IN_INITIALIZER_LIST(input_state_(this)),
33 ALLOW_THIS_IN_INITIALIZER_LIST(output_state_(this)),
34 pipe_(INVALID_HANDLE_VALUE),
36 waiting_connect_(mode & MODE_SERVER_FLAG),
37 processing_incoming_(false),
38 ALLOW_THIS_IN_INITIALIZER_LIST(factory_(this)) {
39 CreatePipe(channel_handle, mode);
42 Channel::ChannelImpl::~ChannelImpl() {
46 void Channel::ChannelImpl::Close() {
47 if (thread_check_.get()) {
48 DCHECK(thread_check_->CalledOnValidThread());
51 if (input_state_.is_pending || output_state_.is_pending)
54 // Closing the handle at this point prevents us from issuing more requests
55 // form OnIOCompleted().
56 if (pipe_ != INVALID_HANDLE_VALUE) {
58 pipe_ = INVALID_HANDLE_VALUE;
61 // Make sure all IO has completed.
62 base::Time start = base::Time::Now();
63 while (input_state_.is_pending || output_state_.is_pending) {
64 MessageLoopForIO::current()->WaitForIOCompletion(INFINITE, this);
67 while (!output_queue_.empty()) {
68 Message* m = output_queue_.front();
74 bool Channel::ChannelImpl::Send(Message* message) {
75 DCHECK(thread_check_->CalledOnValidThread());
76 DVLOG(2) << "sending message @" << message << " on channel @" << this
77 << " with type " << message->type()
78 << " (" << output_queue_.size() << " in queue)";
80 #ifdef IPC_MESSAGE_LOG_ENABLED
81 Logging::GetInstance()->OnSendMessage(message, "");
84 output_queue_.push(message);
85 // ensure waiting to write
86 if (!waiting_connect_) {
87 if (!output_state_.is_pending) {
88 if (!ProcessOutgoingMessages(NULL, 0))
97 bool Channel::ChannelImpl::IsNamedServerInitialized(
98 const std::string& channel_id) {
99 if (WaitNamedPipe(PipeName(channel_id).c_str(), 1))
101 // If ERROR_SEM_TIMEOUT occurred, the pipe exists but is handling another
103 return GetLastError() == ERROR_SEM_TIMEOUT;
107 const std::wstring Channel::ChannelImpl::PipeName(
108 const std::string& channel_id) {
109 std::string name("\\\\.\\pipe\\chrome.");
110 return ASCIIToWide(name.append(channel_id));
113 bool Channel::ChannelImpl::CreatePipe(const IPC::ChannelHandle &channel_handle,
115 DCHECK_EQ(INVALID_HANDLE_VALUE, pipe_);
116 const std::wstring pipe_name = PipeName(channel_handle.name);
117 if (mode & MODE_SERVER_FLAG) {
118 pipe_ = CreateNamedPipeW(pipe_name.c_str(),
119 PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED |
120 FILE_FLAG_FIRST_PIPE_INSTANCE,
121 PIPE_TYPE_BYTE | PIPE_READMODE_BYTE,
123 Channel::kReadBufferSize,
124 Channel::kReadBufferSize,
127 } else if (mode & MODE_CLIENT_FLAG) {
128 pipe_ = CreateFileW(pipe_name.c_str(),
129 GENERIC_READ | GENERIC_WRITE,
133 SECURITY_SQOS_PRESENT | SECURITY_IDENTIFICATION |
134 FILE_FLAG_OVERLAPPED,
139 if (pipe_ == INVALID_HANDLE_VALUE) {
140 // If this process is being closed, the pipe may be gone already.
141 LOG(WARNING) << "Unable to create pipe \"" << pipe_name <<
142 "\" in " << (mode == 0 ? "server" : "client")
143 << " mode. Error :" << GetLastError();
147 // Create the Hello message to be sent when Connect is called
148 scoped_ptr<Message> m(new Message(MSG_ROUTING_NONE,
150 IPC::Message::PRIORITY_NORMAL));
151 if (!m->WriteInt(GetCurrentProcessId())) {
153 pipe_ = INVALID_HANDLE_VALUE;
157 output_queue_.push(m.release());
161 bool Channel::ChannelImpl::Connect() {
162 DLOG_IF(WARNING, thread_check_.get()) << "Connect called more than once";
164 if (!thread_check_.get())
165 thread_check_.reset(new base::NonThreadSafe());
167 if (pipe_ == INVALID_HANDLE_VALUE)
170 MessageLoopForIO::current()->RegisterIOHandler(pipe_, this);
172 // Check to see if there is a client connected to our pipe...
173 if (waiting_connect_)
176 if (!input_state_.is_pending) {
177 // Complete setup asynchronously. By not setting input_state_.is_pending
178 // to true, we indicate to OnIOCompleted that this is the special
179 // initialization signal.
180 MessageLoopForIO::current()->PostTask(FROM_HERE, factory_.NewRunnableMethod(
181 &Channel::ChannelImpl::OnIOCompleted, &input_state_.context, 0, 0));
184 if (!waiting_connect_)
185 ProcessOutgoingMessages(NULL, 0);
189 bool Channel::ChannelImpl::ProcessConnection() {
190 DCHECK(thread_check_->CalledOnValidThread());
191 if (input_state_.is_pending)
192 input_state_.is_pending = false;
194 // Do we have a client connected to our pipe?
195 if (INVALID_HANDLE_VALUE == pipe_)
198 BOOL ok = ConnectNamedPipe(pipe_, &input_state_.context.overlapped);
200 DWORD err = GetLastError();
202 // Uhm, the API documentation says that this function should never
203 // return success when used in overlapped mode.
209 case ERROR_IO_PENDING:
210 input_state_.is_pending = true;
212 case ERROR_PIPE_CONNECTED:
213 waiting_connect_ = false;
216 // The pipe is being closed.
226 bool Channel::ChannelImpl::ProcessIncomingMessages(
227 MessageLoopForIO::IOContext* context,
229 DCHECK(thread_check_->CalledOnValidThread());
230 if (input_state_.is_pending) {
231 input_state_.is_pending = false;
234 if (!context || !bytes_read)
237 // This happens at channel initialization.
238 DCHECK(!bytes_read && context == &input_state_.context);
242 if (bytes_read == 0) {
243 if (INVALID_HANDLE_VALUE == pipe_)
247 BOOL ok = ReadFile(pipe_,
249 Channel::kReadBufferSize,
251 &input_state_.context.overlapped);
253 DWORD err = GetLastError();
254 if (err == ERROR_IO_PENDING) {
255 input_state_.is_pending = true;
258 LOG(ERROR) << "pipe error: " << err;
261 input_state_.is_pending = true;
266 // Process messages from input buffer.
269 if (input_overflow_buf_.empty()) {
271 end = p + bytes_read;
273 if (input_overflow_buf_.size() > (kMaximumMessageSize - bytes_read)) {
274 input_overflow_buf_.clear();
275 LOG(ERROR) << "IPC message is too big";
278 input_overflow_buf_.append(input_buf_, bytes_read);
279 p = input_overflow_buf_.data();
280 end = p + input_overflow_buf_.size();
284 const char* message_tail = Message::FindNext(p, end);
286 int len = static_cast<int>(message_tail - p);
287 const Message m(p, len);
288 DVLOG(2) << "received message on channel @" << this
289 << " with type " << m.type();
290 if (m.routing_id() == MSG_ROUTING_NONE &&
291 m.type() == HELLO_MESSAGE_TYPE) {
292 // The Hello message contains only the process id.
293 listener_->OnChannelConnected(MessageIterator(m).NextInt());
295 listener_->OnMessageReceived(m);
299 // Last message is partial.
303 input_overflow_buf_.assign(p, end - p);
305 bytes_read = 0; // Get more data.
311 bool Channel::ChannelImpl::ProcessOutgoingMessages(
312 MessageLoopForIO::IOContext* context,
313 DWORD bytes_written) {
314 DCHECK(!waiting_connect_); // Why are we trying to send messages if there's
316 DCHECK(thread_check_->CalledOnValidThread());
318 if (output_state_.is_pending) {
320 output_state_.is_pending = false;
321 if (!context || bytes_written == 0) {
322 DWORD err = GetLastError();
323 LOG(ERROR) << "pipe error: " << err;
327 DCHECK(!output_queue_.empty());
328 Message* m = output_queue_.front();
333 if (output_queue_.empty())
336 if (INVALID_HANDLE_VALUE == pipe_)
340 Message* m = output_queue_.front();
341 DCHECK(m->size() <= INT_MAX);
342 BOOL ok = WriteFile(pipe_,
344 static_cast<int>(m->size()),
346 &output_state_.context.overlapped);
348 DWORD err = GetLastError();
349 if (err == ERROR_IO_PENDING) {
350 output_state_.is_pending = true;
352 DVLOG(2) << "sent pending message @" << m << " on channel @" << this
353 << " with type " << m->type();
357 LOG(ERROR) << "pipe error: " << err;
361 DVLOG(2) << "sent message @" << m << " on channel @" << this
362 << " with type " << m->type();
364 output_state_.is_pending = true;
368 void Channel::ChannelImpl::OnIOCompleted(MessageLoopForIO::IOContext* context,
369 DWORD bytes_transfered, DWORD error) {
371 DCHECK(thread_check_->CalledOnValidThread());
372 if (context == &input_state_.context) {
373 if (waiting_connect_) {
374 if (!ProcessConnection())
376 // We may have some messages queued up to send...
377 if (!output_queue_.empty() && !output_state_.is_pending)
378 ProcessOutgoingMessages(NULL, 0);
379 if (input_state_.is_pending)
381 // else, fall-through and look for incoming messages...
383 // we don't support recursion through OnMessageReceived yet!
384 DCHECK(!processing_incoming_);
385 AutoReset<bool> auto_reset_processing_incoming(&processing_incoming_, true);
386 ok = ProcessIncomingMessages(context, bytes_transfered);
388 DCHECK(context == &output_state_.context);
389 ok = ProcessOutgoingMessages(context, bytes_transfered);
391 if (!ok && INVALID_HANDLE_VALUE != pipe_) {
392 // We don't want to re-enter Close().
394 listener_->OnChannelError();
398 //------------------------------------------------------------------------------
399 // Channel's methods simply call through to ChannelImpl.
400 Channel::Channel(const IPC::ChannelHandle &channel_handle, Mode mode,
402 : channel_impl_(new ChannelImpl(channel_handle, mode, listener)) {
405 Channel::~Channel() {
406 delete channel_impl_;
409 bool Channel::Connect() {
410 return channel_impl_->Connect();
413 void Channel::Close() {
414 channel_impl_->Close();
417 void Channel::set_listener(Listener* listener) {
418 channel_impl_->set_listener(listener);
421 bool Channel::Send(Message* message) {
422 return channel_impl_->Send(message);
426 bool Channel::IsNamedServerInitialized(const std::string& channel_id) {
427 return ChannelImpl::IsNamedServerInitialized(channel_id);