bool WriteFrontMessageNoLock();
// Cancels all pending writes and destroys the contents of
- // |write_message_queue_|. Should only be called if |is_dead_| is false; sets
- // |is_dead_| to true. Must be called under |write_lock_|.
+ // |write_message_queue_|. Should only be called if |write_stopped_| is false;
+ // sets |write_stopped_| to true. Must be called under |write_lock_|.
void CancelPendingWritesNoLock();
base::MessageLoopForIO* message_loop_for_io() {
size_t read_buffer_num_valid_bytes_;
base::Lock write_lock_; // Protects the following members.
- bool is_dead_;
+ bool write_stopped_;
std::deque<MessageInTransit*> write_message_queue_;
size_t write_message_offset_;
// This is used for posting tasks from write threads to the I/O thread. It
: RawChannel(delegate, message_loop),
fd_(handle.Pass()),
read_buffer_num_valid_bytes_(0),
- is_dead_(false),
+ write_stopped_(false),
write_message_offset_(0),
weak_ptr_factory_(this) {
CHECK_EQ(RawChannel::message_loop()->type(), base::MessageLoop::TYPE_IO);
}
RawChannelPosix::~RawChannelPosix() {
- DCHECK(is_dead_);
+ DCHECK(write_stopped_);
DCHECK(!fd_.is_valid());
// No need to take the |write_lock_| here -- if there are still weak pointers
DCHECK_EQ(base::MessageLoop::current(), message_loop());
base::AutoLock locker(write_lock_);
- if (!is_dead_)
+ if (!write_stopped_)
CancelPendingWritesNoLock();
+ read_watcher_.reset(); // This will stop watching (if necessary).
+ write_watcher_.reset(); // This will stop watching (if necessary).
+
DCHECK(fd_.is_valid());
fd_.reset();
weak_ptr_factory_.InvalidateWeakPtrs();
-
- read_watcher_.reset(); // This will stop watching (if necessary).
- write_watcher_.reset(); // This will stop watching (if necessary).
}
-// Reminder: This must be thread-safe, and takes ownership of |message| on
-// success.
+// Reminder: This must be thread-safe, and takes ownership of |message|.
bool RawChannelPosix::WriteMessage(MessageInTransit* message) {
base::AutoLock locker(write_lock_);
- if (is_dead_) {
+ if (write_stopped_) {
message->Destroy();
return false;
}
if (bytes_read < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
PLOG(ERROR) << "read";
- {
- base::AutoLock locker(write_lock_);
- CancelPendingWritesNoLock();
- }
+
+ // Make sure that |OnFileCanReadWithoutBlocking()| won't be called
+ // again.
+ read_watcher_.reset();
+
CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
return;
}
bool did_fail = false;
{
base::AutoLock locker(write_lock_);
- DCHECK(!is_dead_);
- DCHECK(!write_message_queue_.empty());
+ DCHECK_EQ(write_stopped_, write_message_queue_.empty());
+
+ if (write_stopped_) {
+ write_watcher_.reset();
+ return;
+ }
bool result = WriteFrontMessageNoLock();
DCHECK(result || write_message_queue_.empty());
- if (!result)
+ if (!result) {
did_fail = true;
- else if (!write_message_queue_.empty())
+ write_watcher_.reset();
+ } else if (!write_message_queue_.empty()) {
WaitToWrite();
+ }
}
if (did_fail)
CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
bool RawChannelPosix::WriteFrontMessageNoLock() {
write_lock_.AssertAcquired();
- DCHECK(!is_dead_);
+ DCHECK(!write_stopped_);
DCHECK(!write_message_queue_.empty());
MessageInTransit* message = write_message_queue_.front();
void RawChannelPosix::CancelPendingWritesNoLock() {
write_lock_.AssertAcquired();
- DCHECK(!is_dead_);
+ DCHECK(!write_stopped_);
- is_dead_ = true;
+ write_stopped_ = true;
for (std::deque<MessageInTransit*>::iterator it =
write_message_queue_.begin(); it != write_message_queue_.end();
++it) {