// Fast version called with known reactor passed in, used from derived
// classes, typically in non-cancel case
void MaybeCallOnCancel(ServerReactor* reactor) {
- if (GPR_UNLIKELY(on_cancel_conditions_remaining_.fetch_sub(
- 1, std::memory_order_acq_rel) == 1)) {
+ if (GPR_UNLIKELY(UnblockCancellation())) {
CallOnCancel(reactor);
}
}
// (such as the ServerContext CompletionOp which is formed before the
// reactor). This is used in cancel cases only, so it's ok to be slower and
// invoke a virtual function.
- void MaybeCallOnCancel() { MaybeCallOnCancel(reactor()); }
+ void MaybeCallOnCancel() {
+ if (GPR_UNLIKELY(UnblockCancellation())) {
+ CallOnCancel(reactor());
+ }
+ }
protected:
/// Increases the reference count
// it to an executor.
void CallOnCancel(ServerReactor* reactor);
+ // Implement the cancellation constraint counter. Return true if OnCancel
+ // should be called, false otherwise.
+ bool UnblockCancellation() {
+ return on_cancel_conditions_remaining_.fetch_sub(
+ 1, std::memory_order_acq_rel) == 1;
+ }
+
std::atomic_int on_cancel_conditions_remaining_{2};
std::atomic_int callbacks_outstanding_{
3}; // reserve for start, Finish, and CompletionOp
grpc::internal::MutexLock l(&stream_mu_);
stream = stream_.load(std::memory_order_relaxed);
if (stream == nullptr) {
- send_initial_metadata_wanted_ = true;
+ backlog_.send_initial_metadata_wanted = true;
return;
}
}
grpc::internal::MutexLock l(&stream_mu_);
stream = stream_.load(std::memory_order_relaxed);
if (stream == nullptr) {
- read_wanted_ = req;
+ backlog_.read_wanted = req;
return;
}
}
grpc::internal::MutexLock l(&stream_mu_);
stream = stream_.load(std::memory_order_relaxed);
if (stream == nullptr) {
- write_wanted_ = resp;
- write_options_wanted_ = std::move(options);
+ backlog_.write_wanted = resp;
+ backlog_.write_options_wanted = std::move(options);
return;
}
}
grpc::internal::MutexLock l(&stream_mu_);
stream = stream_.load(std::memory_order_relaxed);
if (stream == nullptr) {
- write_and_finish_wanted_ = true;
- write_wanted_ = resp;
- write_options_wanted_ = std::move(options);
- status_wanted_ = std::move(s);
+ backlog_.write_and_finish_wanted = true;
+ backlog_.write_wanted = resp;
+ backlog_.write_options_wanted = std::move(options);
+ backlog_.status_wanted = std::move(s);
return;
}
}
grpc::internal::MutexLock l(&stream_mu_);
stream = stream_.load(std::memory_order_relaxed);
if (stream == nullptr) {
- finish_wanted_ = true;
- status_wanted_ = std::move(s);
+ backlog_.finish_wanted = true;
+ backlog_.status_wanted = std::move(s);
return;
}
}
// customization point.
virtual void InternalBindStream(
ServerCallbackReaderWriter<Request, Response>* stream) {
+ // TODO(vjpai): When stream_or_backlog_ becomes a variant (see below), use
+ // a scoped MutexLock and std::swap stream_or_backlog_ with a variant that
+ // has stream, then std::get<PreBindBacklog> out of that after the lock.
+ // Do likewise with the remaining InternalBind* functions as well.
grpc::internal::ReleasableMutexLock l(&stream_mu_);
+ PreBindBacklog ops(std::move(backlog_));
stream_.store(stream, std::memory_order_release);
- if (send_initial_metadata_wanted_) {
+ l.Unlock();
+
+ if (ops.send_initial_metadata_wanted) {
stream->SendInitialMetadata();
- send_initial_metadata_wanted_ = false;
}
- if (read_wanted_ != nullptr) {
- stream->Read(read_wanted_);
- read_wanted_ = nullptr;
+ if (ops.read_wanted != nullptr) {
+ stream->Read(ops.read_wanted);
}
- if (write_and_finish_wanted_) {
- // Don't perform actual finish actions while holding lock since it could
- // trigger OnDone that destroys this object including the still-held lock.
- write_and_finish_wanted_ = false;
- const Response* write_wanted = write_wanted_;
- ::grpc::WriteOptions write_options_wanted =
- std::move(write_options_wanted_);
- ::grpc::Status status_wanted = std::move(status_wanted_);
- l.Unlock();
- stream->WriteAndFinish(write_wanted, std::move(write_options_wanted),
- std::move(status_wanted));
- return;
+ if (ops.write_and_finish_wanted) {
+ stream->WriteAndFinish(ops.write_wanted,
+ std::move(ops.write_options_wanted),
+ std::move(ops.status_wanted));
} else {
- if (write_wanted_ != nullptr) {
- stream->Write(write_wanted_, std::move(write_options_wanted_));
- write_wanted_ = nullptr;
+ if (ops.write_wanted != nullptr) {
+ stream->Write(ops.write_wanted, std::move(ops.write_options_wanted));
}
- if (finish_wanted_) {
- finish_wanted_ = false;
- ::grpc::Status status_wanted = std::move(status_wanted_);
- l.Unlock();
- stream->Finish(std::move(status_wanted));
- return;
+ if (ops.finish_wanted) {
+ stream->Finish(std::move(ops.status_wanted));
}
}
}
grpc::internal::Mutex stream_mu_;
- std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_;
- bool send_initial_metadata_wanted_ /* GUARDED_BY(stream_mu_) */ = false;
- bool write_and_finish_wanted_ /* GUARDED_BY(stream_mu_) */ = false;
- bool finish_wanted_ /* GUARDED_BY(stream_mu_) */ = false;
- Request* read_wanted_ /* GUARDED_BY(stream_mu_) */ = nullptr;
- const Response* write_wanted_ /* GUARDED_BY(stream_mu_) */ = nullptr;
- ::grpc::WriteOptions write_options_wanted_ /* GUARDED_BY(stream_mu_) */;
- ::grpc::Status status_wanted_ /* GUARDED_BY(stream_mu_) */;
+ // TODO(vjpai): Make stream_or_backlog_ into a std::variant or absl::variant
+ // once C++17 or ABSL is supported since stream and backlog are
+ // mutually exclusive in this class. Do likewise with the
+ // remaining reactor classes and their backlogs as well.
+ std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{nullptr};
+ struct PreBindBacklog {
+ bool send_initial_metadata_wanted = false;
+ bool write_and_finish_wanted = false;
+ bool finish_wanted = false;
+ Request* read_wanted = nullptr;
+ const Response* write_wanted = nullptr;
+ ::grpc::WriteOptions write_options_wanted;
+ ::grpc::Status status_wanted;
+ };
+ PreBindBacklog backlog_ /* GUARDED_BY(stream_mu_) */;
};
/// \a ServerReadReactor is the interface for a client-streaming RPC.
grpc::internal::MutexLock l(&reader_mu_);
reader = reader_.load(std::memory_order_relaxed);
if (reader == nullptr) {
- send_initial_metadata_wanted_ = true;
+ backlog_.send_initial_metadata_wanted = true;
return;
}
}
grpc::internal::MutexLock l(&reader_mu_);
reader = reader_.load(std::memory_order_relaxed);
if (reader == nullptr) {
- read_wanted_ = req;
+ backlog_.read_wanted = req;
return;
}
}
grpc::internal::MutexLock l(&reader_mu_);
reader = reader_.load(std::memory_order_relaxed);
if (reader == nullptr) {
- finish_wanted_ = true;
- status_wanted_ = std::move(s);
+ backlog_.finish_wanted = true;
+ backlog_.status_wanted = std::move(s);
return;
}
}
// customization point.
virtual void InternalBindReader(ServerCallbackReader<Request>* reader) {
grpc::internal::ReleasableMutexLock l(&reader_mu_);
+ PreBindBacklog ops(std::move(backlog_));
reader_.store(reader, std::memory_order_release);
- if (send_initial_metadata_wanted_) {
+ l.Unlock();
+
+ if (ops.send_initial_metadata_wanted) {
reader->SendInitialMetadata();
- send_initial_metadata_wanted_ = false;
}
- if (read_wanted_ != nullptr) {
- reader->Read(read_wanted_);
- read_wanted_ = nullptr;
+ if (ops.read_wanted != nullptr) {
+ reader->Read(ops.read_wanted);
}
- if (finish_wanted_) {
- finish_wanted_ = false;
- ::grpc::Status status_wanted = std::move(status_wanted_);
- l.Unlock();
- reader->Finish(std::move(status_wanted));
- return;
+ if (ops.finish_wanted) {
+ reader->Finish(std::move(ops.status_wanted));
}
}
grpc::internal::Mutex reader_mu_;
- std::atomic<ServerCallbackReader<Request>*> reader_;
- bool send_initial_metadata_wanted_ /* GUARDED_BY(reader_mu_) */ = false;
- bool finish_wanted_ /* GUARDED_BY(reader_mu_) */ = false;
- Request* read_wanted_ /* GUARDED_BY(reader_mu_) */ = nullptr;
- ::grpc::Status status_wanted_ /* GUARDED_BY(reader_mu_) */;
+ std::atomic<ServerCallbackReader<Request>*> reader_{nullptr};
+ struct PreBindBacklog {
+ bool send_initial_metadata_wanted = false;
+ bool finish_wanted = false;
+ Request* read_wanted = nullptr;
+ ::grpc::Status status_wanted;
+ };
+ PreBindBacklog backlog_ /* GUARDED_BY(reader_mu_) */;
};
/// \a ServerWriteReactor is the interface for a server-streaming RPC.
grpc::internal::MutexLock l(&writer_mu_);
writer = writer_.load(std::memory_order_relaxed);
if (writer == nullptr) {
- send_initial_metadata_wanted_ = true;
+ backlog_.send_initial_metadata_wanted = true;
return;
}
}
grpc::internal::MutexLock l(&writer_mu_);
writer = writer_.load(std::memory_order_relaxed);
if (writer == nullptr) {
- write_wanted_ = resp;
- write_options_wanted_ = std::move(options);
+ backlog_.write_wanted = resp;
+ backlog_.write_options_wanted = std::move(options);
return;
}
}
grpc::internal::MutexLock l(&writer_mu_);
writer = writer_.load(std::memory_order_relaxed);
if (writer == nullptr) {
- write_and_finish_wanted_ = true;
- write_wanted_ = resp;
- write_options_wanted_ = std::move(options);
- status_wanted_ = std::move(s);
+ backlog_.write_and_finish_wanted = true;
+ backlog_.write_wanted = resp;
+ backlog_.write_options_wanted = std::move(options);
+ backlog_.status_wanted = std::move(s);
return;
}
}
grpc::internal::MutexLock l(&writer_mu_);
writer = writer_.load(std::memory_order_relaxed);
if (writer == nullptr) {
- finish_wanted_ = true;
- status_wanted_ = std::move(s);
+ backlog_.finish_wanted = true;
+ backlog_.status_wanted = std::move(s);
return;
}
}
// customization point.
virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer) {
grpc::internal::ReleasableMutexLock l(&writer_mu_);
+ PreBindBacklog ops(std::move(backlog_));
writer_.store(writer, std::memory_order_release);
- if (send_initial_metadata_wanted_) {
+ l.Unlock();
+
+ if (ops.send_initial_metadata_wanted) {
writer->SendInitialMetadata();
- send_initial_metadata_wanted_ = false;
}
- if (write_and_finish_wanted_) {
- write_and_finish_wanted_ = false;
- const Response* write_wanted = write_wanted_;
- ::grpc::WriteOptions write_options_wanted =
- std::move(write_options_wanted_);
- ::grpc::Status status_wanted = std::move(status_wanted_);
- l.Unlock();
- writer->WriteAndFinish(write_wanted, std::move(write_options_wanted),
- std::move(status_wanted));
- return;
+ if (ops.write_and_finish_wanted) {
+ writer->WriteAndFinish(ops.write_wanted,
+ std::move(ops.write_options_wanted),
+ std::move(ops.status_wanted));
} else {
- if (write_wanted_ != nullptr) {
- writer->Write(write_wanted_, std::move(write_options_wanted_));
- write_wanted_ = nullptr;
+ if (ops.write_wanted != nullptr) {
+ writer->Write(ops.write_wanted, std::move(ops.write_options_wanted));
}
- if (finish_wanted_) {
- finish_wanted_ = false;
- ::grpc::Status status_wanted = std::move(status_wanted_);
- l.Unlock();
- writer->Finish(std::move(status_wanted));
- return;
+ if (ops.finish_wanted) {
+ writer->Finish(std::move(ops.status_wanted));
}
}
}
grpc::internal::Mutex writer_mu_;
- std::atomic<ServerCallbackWriter<Response>*> writer_;
- bool send_initial_metadata_wanted_ /* GUARDED_BY(writer_mu_) */ = false;
- bool write_and_finish_wanted_ /* GUARDED_BY(writer_mu_) */ = false;
- bool finish_wanted_ /* GUARDED_BY(writer_mu_) */ = false;
- const Response* write_wanted_ /* GUARDED_BY(writer_mu_) */ = nullptr;
- ::grpc::WriteOptions write_options_wanted_ /* GUARDED_BY(writer_mu_) */;
- ::grpc::Status status_wanted_ /* GUARDED_BY(writer_mu_) */;
+ std::atomic<ServerCallbackWriter<Response>*> writer_{nullptr};
+ struct PreBindBacklog {
+ bool send_initial_metadata_wanted = false;
+ bool write_and_finish_wanted = false;
+ bool finish_wanted = false;
+ const Response* write_wanted = nullptr;
+ ::grpc::WriteOptions write_options_wanted;
+ ::grpc::Status status_wanted;
+ };
+ PreBindBacklog backlog_ /* GUARDED_BY(writer_mu_) */;
};
class ServerUnaryReactor : public internal::ServerReactor {
grpc::internal::MutexLock l(&call_mu_);
call = call_.load(std::memory_order_relaxed);
if (call == nullptr) {
- send_initial_metadata_wanted_ = true;
+ backlog_.send_initial_metadata_wanted = true;
return;
}
}
grpc::internal::MutexLock l(&call_mu_);
call = call_.load(std::memory_order_relaxed);
if (call == nullptr) {
- finish_wanted_ = true;
- status_wanted_ = std::move(s);
+ backlog_.finish_wanted = true;
+ backlog_.status_wanted = std::move(s);
return;
}
}
// customization point.
virtual void InternalBindCall(ServerCallbackUnary* call) {
grpc::internal::ReleasableMutexLock l(&call_mu_);
+ PreBindBacklog ops(std::move(backlog_));
call_.store(call, std::memory_order_release);
- if (send_initial_metadata_wanted_) {
+ l.Unlock();
+
+ if (ops.send_initial_metadata_wanted) {
call->SendInitialMetadata();
- send_initial_metadata_wanted_ = false;
}
- if (finish_wanted_) {
- finish_wanted_ = false;
- ::grpc::Status status_wanted = std::move(status_wanted_);
- l.Unlock();
- call->Finish(std::move(status_wanted));
- return;
+ if (ops.finish_wanted) {
+ call->Finish(std::move(ops.status_wanted));
}
}
grpc::internal::Mutex call_mu_;
- std::atomic<ServerCallbackUnary*> call_;
- bool send_initial_metadata_wanted_ /* GUARDED_BY(writer_mu_) */ = false;
- bool finish_wanted_ /* GUARDED_BY(writer_mu_) */ = false;
- ::grpc::Status status_wanted_ /* GUARDED_BY(writer_mu_) */;
+ std::atomic<ServerCallbackUnary*> call_{nullptr};
+ struct PreBindBacklog {
+ bool send_initial_metadata_wanted = false;
+ bool finish_wanted = false;
+ ::grpc::Status status_wanted;
+ };
+ PreBindBacklog backlog_ /* GUARDED_BY(call_mu_) */;
};
namespace internal {