Imported Upstream version 1.27.0
[platform/upstream/grpc.git] / include / grpcpp / impl / codegen / server_callback_impl.h
index ccdcb6a..781e96f 100644 (file)
@@ -82,8 +82,7 @@ class ServerCallbackCall {
   // Fast version called with known reactor passed in, used from derived
   // classes, typically in non-cancel case
   void MaybeCallOnCancel(ServerReactor* reactor) {
-    if (GPR_UNLIKELY(on_cancel_conditions_remaining_.fetch_sub(
-                         1, std::memory_order_acq_rel) == 1)) {
+    if (GPR_UNLIKELY(UnblockCancellation())) {
       CallOnCancel(reactor);
     }
   }
@@ -92,7 +91,11 @@ class ServerCallbackCall {
   // (such as the ServerContext CompletionOp which is formed before the
   // reactor). This is used in cancel cases only, so it's ok to be slower and
   // invoke a virtual function.
-  void MaybeCallOnCancel() { MaybeCallOnCancel(reactor()); }
+  void MaybeCallOnCancel() {
+    if (GPR_UNLIKELY(UnblockCancellation())) {
+      CallOnCancel(reactor());
+    }
+  }
 
  protected:
   /// Increases the reference count
@@ -111,6 +114,13 @@ class ServerCallbackCall {
   // it to an executor.
   void CallOnCancel(ServerReactor* reactor);
 
+  // Implement the cancellation constraint counter. Return true if OnCancel
+  // should be called, false otherwise.
+  bool UnblockCancellation() {
+    return on_cancel_conditions_remaining_.fetch_sub(
+               1, std::memory_order_acq_rel) == 1;
+  }
+
   std::atomic_int on_cancel_conditions_remaining_{2};
   std::atomic_int callbacks_outstanding_{
       3};  // reserve for start, Finish, and CompletionOp
@@ -240,7 +250,7 @@ class ServerBidiReactor : public internal::ServerReactor {
       grpc::internal::MutexLock l(&stream_mu_);
       stream = stream_.load(std::memory_order_relaxed);
       if (stream == nullptr) {
-        send_initial_metadata_wanted_ = true;
+        backlog_.send_initial_metadata_wanted = true;
         return;
       }
     }
@@ -258,7 +268,7 @@ class ServerBidiReactor : public internal::ServerReactor {
       grpc::internal::MutexLock l(&stream_mu_);
       stream = stream_.load(std::memory_order_relaxed);
       if (stream == nullptr) {
-        read_wanted_ = req;
+        backlog_.read_wanted = req;
         return;
       }
     }
@@ -287,8 +297,8 @@ class ServerBidiReactor : public internal::ServerReactor {
       grpc::internal::MutexLock l(&stream_mu_);
       stream = stream_.load(std::memory_order_relaxed);
       if (stream == nullptr) {
-        write_wanted_ = resp;
-        write_options_wanted_ = std::move(options);
+        backlog_.write_wanted = resp;
+        backlog_.write_options_wanted = std::move(options);
         return;
       }
     }
@@ -316,10 +326,10 @@ class ServerBidiReactor : public internal::ServerReactor {
       grpc::internal::MutexLock l(&stream_mu_);
       stream = stream_.load(std::memory_order_relaxed);
       if (stream == nullptr) {
-        write_and_finish_wanted_ = true;
-        write_wanted_ = resp;
-        write_options_wanted_ = std::move(options);
-        status_wanted_ = std::move(s);
+        backlog_.write_and_finish_wanted = true;
+        backlog_.write_wanted = resp;
+        backlog_.write_options_wanted = std::move(options);
+        backlog_.status_wanted = std::move(s);
         return;
       }
     }
@@ -351,8 +361,8 @@ class ServerBidiReactor : public internal::ServerReactor {
       grpc::internal::MutexLock l(&stream_mu_);
       stream = stream_.load(std::memory_order_relaxed);
       if (stream == nullptr) {
-        finish_wanted_ = true;
-        status_wanted_ = std::move(s);
+        backlog_.finish_wanted = true;
+        backlog_.status_wanted = std::move(s);
         return;
       }
     }
@@ -396,52 +406,51 @@ class ServerBidiReactor : public internal::ServerReactor {
   // customization point.
   virtual void InternalBindStream(
       ServerCallbackReaderWriter<Request, Response>* stream) {
+    // TODO(vjpai): When stream_or_backlog_ becomes a variant (see below), use
+    // a scoped MutexLock and std::swap stream_or_backlog_ with a variant that
+    // has stream, then std::get<PreBindBacklog> out of that after the lock.
+    // Do likewise with the remaining InternalBind* functions as well.
     grpc::internal::ReleasableMutexLock l(&stream_mu_);
+    PreBindBacklog ops(std::move(backlog_));
     stream_.store(stream, std::memory_order_release);
-    if (send_initial_metadata_wanted_) {
+    l.Unlock();
+
+    if (ops.send_initial_metadata_wanted) {
       stream->SendInitialMetadata();
-      send_initial_metadata_wanted_ = false;
     }
-    if (read_wanted_ != nullptr) {
-      stream->Read(read_wanted_);
-      read_wanted_ = nullptr;
+    if (ops.read_wanted != nullptr) {
+      stream->Read(ops.read_wanted);
     }
-    if (write_and_finish_wanted_) {
-      // Don't perform actual finish actions while holding lock since it could
-      // trigger OnDone that destroys this object including the still-held lock.
-      write_and_finish_wanted_ = false;
-      const Response* write_wanted = write_wanted_;
-      ::grpc::WriteOptions write_options_wanted =
-          std::move(write_options_wanted_);
-      ::grpc::Status status_wanted = std::move(status_wanted_);
-      l.Unlock();
-      stream->WriteAndFinish(write_wanted, std::move(write_options_wanted),
-                             std::move(status_wanted));
-      return;
+    if (ops.write_and_finish_wanted) {
+      stream->WriteAndFinish(ops.write_wanted,
+                             std::move(ops.write_options_wanted),
+                             std::move(ops.status_wanted));
     } else {
-      if (write_wanted_ != nullptr) {
-        stream->Write(write_wanted_, std::move(write_options_wanted_));
-        write_wanted_ = nullptr;
+      if (ops.write_wanted != nullptr) {
+        stream->Write(ops.write_wanted, std::move(ops.write_options_wanted));
       }
-      if (finish_wanted_) {
-        finish_wanted_ = false;
-        ::grpc::Status status_wanted = std::move(status_wanted_);
-        l.Unlock();
-        stream->Finish(std::move(status_wanted));
-        return;
+      if (ops.finish_wanted) {
+        stream->Finish(std::move(ops.status_wanted));
       }
     }
   }
 
   grpc::internal::Mutex stream_mu_;
-  std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_;
-  bool send_initial_metadata_wanted_ /* GUARDED_BY(stream_mu_) */ = false;
-  bool write_and_finish_wanted_ /* GUARDED_BY(stream_mu_) */ = false;
-  bool finish_wanted_ /* GUARDED_BY(stream_mu_) */ = false;
-  Request* read_wanted_ /* GUARDED_BY(stream_mu_) */ = nullptr;
-  const Response* write_wanted_ /* GUARDED_BY(stream_mu_) */ = nullptr;
-  ::grpc::WriteOptions write_options_wanted_ /* GUARDED_BY(stream_mu_) */;
-  ::grpc::Status status_wanted_ /* GUARDED_BY(stream_mu_) */;
+  // TODO(vjpai): Make stream_or_backlog_ into a std::variant or absl::variant
+  //              once C++17 or ABSL is supported since stream and backlog are
+  //              mutually exclusive in this class. Do likewise with the
+  //              remaining reactor classes and their backlogs as well.
+  std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{nullptr};
+  struct PreBindBacklog {
+    bool send_initial_metadata_wanted = false;
+    bool write_and_finish_wanted = false;
+    bool finish_wanted = false;
+    Request* read_wanted = nullptr;
+    const Response* write_wanted = nullptr;
+    ::grpc::WriteOptions write_options_wanted;
+    ::grpc::Status status_wanted;
+  };
+  PreBindBacklog backlog_ /* GUARDED_BY(stream_mu_) */;
 };
 
 /// \a ServerReadReactor is the interface for a client-streaming RPC.
@@ -459,7 +468,7 @@ class ServerReadReactor : public internal::ServerReactor {
       grpc::internal::MutexLock l(&reader_mu_);
       reader = reader_.load(std::memory_order_relaxed);
       if (reader == nullptr) {
-        send_initial_metadata_wanted_ = true;
+        backlog_.send_initial_metadata_wanted = true;
         return;
       }
     }
@@ -472,7 +481,7 @@ class ServerReadReactor : public internal::ServerReactor {
       grpc::internal::MutexLock l(&reader_mu_);
       reader = reader_.load(std::memory_order_relaxed);
       if (reader == nullptr) {
-        read_wanted_ = req;
+        backlog_.read_wanted = req;
         return;
       }
     }
@@ -485,8 +494,8 @@ class ServerReadReactor : public internal::ServerReactor {
       grpc::internal::MutexLock l(&reader_mu_);
       reader = reader_.load(std::memory_order_relaxed);
       if (reader == nullptr) {
-        finish_wanted_ = true;
-        status_wanted_ = std::move(s);
+        backlog_.finish_wanted = true;
+        backlog_.status_wanted = std::move(s);
         return;
       }
     }
@@ -506,30 +515,30 @@ class ServerReadReactor : public internal::ServerReactor {
   // customization point.
   virtual void InternalBindReader(ServerCallbackReader<Request>* reader) {
     grpc::internal::ReleasableMutexLock l(&reader_mu_);
+    PreBindBacklog ops(std::move(backlog_));
     reader_.store(reader, std::memory_order_release);
-    if (send_initial_metadata_wanted_) {
+    l.Unlock();
+
+    if (ops.send_initial_metadata_wanted) {
       reader->SendInitialMetadata();
-      send_initial_metadata_wanted_ = false;
     }
-    if (read_wanted_ != nullptr) {
-      reader->Read(read_wanted_);
-      read_wanted_ = nullptr;
+    if (ops.read_wanted != nullptr) {
+      reader->Read(ops.read_wanted);
     }
-    if (finish_wanted_) {
-      finish_wanted_ = false;
-      ::grpc::Status status_wanted = std::move(status_wanted_);
-      l.Unlock();
-      reader->Finish(std::move(status_wanted));
-      return;
+    if (ops.finish_wanted) {
+      reader->Finish(std::move(ops.status_wanted));
     }
   }
 
   grpc::internal::Mutex reader_mu_;
-  std::atomic<ServerCallbackReader<Request>*> reader_;
-  bool send_initial_metadata_wanted_ /* GUARDED_BY(reader_mu_) */ = false;
-  bool finish_wanted_ /* GUARDED_BY(reader_mu_) */ = false;
-  Request* read_wanted_ /* GUARDED_BY(reader_mu_) */ = nullptr;
-  ::grpc::Status status_wanted_ /* GUARDED_BY(reader_mu_) */;
+  std::atomic<ServerCallbackReader<Request>*> reader_{nullptr};
+  struct PreBindBacklog {
+    bool send_initial_metadata_wanted = false;
+    bool finish_wanted = false;
+    Request* read_wanted = nullptr;
+    ::grpc::Status status_wanted;
+  };
+  PreBindBacklog backlog_ /* GUARDED_BY(reader_mu_) */;
 };
 
 /// \a ServerWriteReactor is the interface for a server-streaming RPC.
@@ -547,7 +556,7 @@ class ServerWriteReactor : public internal::ServerReactor {
       grpc::internal::MutexLock l(&writer_mu_);
       writer = writer_.load(std::memory_order_relaxed);
       if (writer == nullptr) {
-        send_initial_metadata_wanted_ = true;
+        backlog_.send_initial_metadata_wanted = true;
         return;
       }
     }
@@ -563,8 +572,8 @@ class ServerWriteReactor : public internal::ServerReactor {
       grpc::internal::MutexLock l(&writer_mu_);
       writer = writer_.load(std::memory_order_relaxed);
       if (writer == nullptr) {
-        write_wanted_ = resp;
-        write_options_wanted_ = std::move(options);
+        backlog_.write_wanted = resp;
+        backlog_.write_options_wanted = std::move(options);
         return;
       }
     }
@@ -578,10 +587,10 @@ class ServerWriteReactor : public internal::ServerReactor {
       grpc::internal::MutexLock l(&writer_mu_);
       writer = writer_.load(std::memory_order_relaxed);
       if (writer == nullptr) {
-        write_and_finish_wanted_ = true;
-        write_wanted_ = resp;
-        write_options_wanted_ = std::move(options);
-        status_wanted_ = std::move(s);
+        backlog_.write_and_finish_wanted = true;
+        backlog_.write_wanted = resp;
+        backlog_.write_options_wanted = std::move(options);
+        backlog_.status_wanted = std::move(s);
         return;
       }
     }
@@ -597,8 +606,8 @@ class ServerWriteReactor : public internal::ServerReactor {
       grpc::internal::MutexLock l(&writer_mu_);
       writer = writer_.load(std::memory_order_relaxed);
       if (writer == nullptr) {
-        finish_wanted_ = true;
-        status_wanted_ = std::move(s);
+        backlog_.finish_wanted = true;
+        backlog_.status_wanted = std::move(s);
         return;
       }
     }
@@ -617,44 +626,38 @@ class ServerWriteReactor : public internal::ServerReactor {
   // customization point.
   virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer) {
     grpc::internal::ReleasableMutexLock l(&writer_mu_);
+    PreBindBacklog ops(std::move(backlog_));
     writer_.store(writer, std::memory_order_release);
-    if (send_initial_metadata_wanted_) {
+    l.Unlock();
+
+    if (ops.send_initial_metadata_wanted) {
       writer->SendInitialMetadata();
-      send_initial_metadata_wanted_ = false;
     }
-    if (write_and_finish_wanted_) {
-      write_and_finish_wanted_ = false;
-      const Response* write_wanted = write_wanted_;
-      ::grpc::WriteOptions write_options_wanted =
-          std::move(write_options_wanted_);
-      ::grpc::Status status_wanted = std::move(status_wanted_);
-      l.Unlock();
-      writer->WriteAndFinish(write_wanted, std::move(write_options_wanted),
-                             std::move(status_wanted));
-      return;
+    if (ops.write_and_finish_wanted) {
+      writer->WriteAndFinish(ops.write_wanted,
+                             std::move(ops.write_options_wanted),
+                             std::move(ops.status_wanted));
     } else {
-      if (write_wanted_ != nullptr) {
-        writer->Write(write_wanted_, std::move(write_options_wanted_));
-        write_wanted_ = nullptr;
+      if (ops.write_wanted != nullptr) {
+        writer->Write(ops.write_wanted, std::move(ops.write_options_wanted));
       }
-      if (finish_wanted_) {
-        finish_wanted_ = false;
-        ::grpc::Status status_wanted = std::move(status_wanted_);
-        l.Unlock();
-        writer->Finish(std::move(status_wanted));
-        return;
+      if (ops.finish_wanted) {
+        writer->Finish(std::move(ops.status_wanted));
       }
     }
   }
 
   grpc::internal::Mutex writer_mu_;
-  std::atomic<ServerCallbackWriter<Response>*> writer_;
-  bool send_initial_metadata_wanted_ /* GUARDED_BY(writer_mu_) */ = false;
-  bool write_and_finish_wanted_ /* GUARDED_BY(writer_mu_) */ = false;
-  bool finish_wanted_ /* GUARDED_BY(writer_mu_) */ = false;
-  const Response* write_wanted_ /* GUARDED_BY(writer_mu_) */ = nullptr;
-  ::grpc::WriteOptions write_options_wanted_ /* GUARDED_BY(writer_mu_) */;
-  ::grpc::Status status_wanted_ /* GUARDED_BY(writer_mu_) */;
+  std::atomic<ServerCallbackWriter<Response>*> writer_{nullptr};
+  struct PreBindBacklog {
+    bool send_initial_metadata_wanted = false;
+    bool write_and_finish_wanted = false;
+    bool finish_wanted = false;
+    const Response* write_wanted = nullptr;
+    ::grpc::WriteOptions write_options_wanted;
+    ::grpc::Status status_wanted;
+  };
+  PreBindBacklog backlog_ /* GUARDED_BY(writer_mu_) */;
 };
 
 class ServerUnaryReactor : public internal::ServerReactor {
@@ -669,7 +672,7 @@ class ServerUnaryReactor : public internal::ServerReactor {
       grpc::internal::MutexLock l(&call_mu_);
       call = call_.load(std::memory_order_relaxed);
       if (call == nullptr) {
-        send_initial_metadata_wanted_ = true;
+        backlog_.send_initial_metadata_wanted = true;
         return;
       }
     }
@@ -681,8 +684,8 @@ class ServerUnaryReactor : public internal::ServerReactor {
       grpc::internal::MutexLock l(&call_mu_);
       call = call_.load(std::memory_order_relaxed);
       if (call == nullptr) {
-        finish_wanted_ = true;
-        status_wanted_ = std::move(s);
+        backlog_.finish_wanted = true;
+        backlog_.status_wanted = std::move(s);
         return;
       }
     }
@@ -700,25 +703,26 @@ class ServerUnaryReactor : public internal::ServerReactor {
   // customization point.
   virtual void InternalBindCall(ServerCallbackUnary* call) {
     grpc::internal::ReleasableMutexLock l(&call_mu_);
+    PreBindBacklog ops(std::move(backlog_));
     call_.store(call, std::memory_order_release);
-    if (send_initial_metadata_wanted_) {
+    l.Unlock();
+
+    if (ops.send_initial_metadata_wanted) {
       call->SendInitialMetadata();
-      send_initial_metadata_wanted_ = false;
     }
-    if (finish_wanted_) {
-      finish_wanted_ = false;
-      ::grpc::Status status_wanted = std::move(status_wanted_);
-      l.Unlock();
-      call->Finish(std::move(status_wanted));
-      return;
+    if (ops.finish_wanted) {
+      call->Finish(std::move(ops.status_wanted));
     }
   }
 
   grpc::internal::Mutex call_mu_;
-  std::atomic<ServerCallbackUnary*> call_;
-  bool send_initial_metadata_wanted_ /* GUARDED_BY(writer_mu_) */ = false;
-  bool finish_wanted_ /* GUARDED_BY(writer_mu_) */ = false;
-  ::grpc::Status status_wanted_ /* GUARDED_BY(writer_mu_) */;
+  std::atomic<ServerCallbackUnary*> call_{nullptr};
+  struct PreBindBacklog {
+    bool send_initial_metadata_wanted = false;
+    bool finish_wanted = false;
+    ::grpc::Status status_wanted;
+  };
+  PreBindBacklog backlog_ /* GUARDED_BY(call_mu_) */;
 };
 
 namespace internal {