Imported Upstream version 1.27.0
[platform/upstream/grpc.git] / include / grpcpp / impl / codegen / server_callback_impl.h
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #ifndef GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_IMPL_H
19 #define GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_IMPL_H
20
21 #include <atomic>
22 #include <functional>
23 #include <type_traits>
24
25 #include <grpcpp/impl/codegen/call.h>
26 #include <grpcpp/impl/codegen/call_op_set.h>
27 #include <grpcpp/impl/codegen/callback_common.h>
28 #include <grpcpp/impl/codegen/config.h>
29 #include <grpcpp/impl/codegen/core_codegen_interface.h>
30 #include <grpcpp/impl/codegen/message_allocator.h>
31 #include <grpcpp/impl/codegen/status.h>
32
33 namespace grpc_impl {
34
35 // Declare base class of all reactors as internal
36 namespace internal {
37
38 // Forward declarations
39 template <class Request, class Response>
40 class CallbackUnaryHandler;
41 template <class Request, class Response>
42 class CallbackClientStreamingHandler;
43 template <class Request, class Response>
44 class CallbackServerStreamingHandler;
45 template <class Request, class Response>
46 class CallbackBidiHandler;
47
48 class ServerReactor {
49  public:
50   virtual ~ServerReactor() = default;
51   virtual void OnDone() = 0;
52   virtual void OnCancel() = 0;
53
54   // The following is not API. It is for internal use only and specifies whether
55   // all reactions of this Reactor can be run without an extra executor
56   // scheduling. This should only be used for internally-defined reactors with
57   // trivial reactions.
58   virtual bool InternalInlineable() { return false; }
59
60  private:
61   template <class Request, class Response>
62   friend class CallbackUnaryHandler;
63   template <class Request, class Response>
64   friend class CallbackClientStreamingHandler;
65   template <class Request, class Response>
66   friend class CallbackServerStreamingHandler;
67   template <class Request, class Response>
68   friend class CallbackBidiHandler;
69 };
70
71 /// The base class of ServerCallbackUnary etc.
72 class ServerCallbackCall {
73  public:
74   virtual ~ServerCallbackCall() {}
75
76   // This object is responsible for tracking when it is safe to call
77   // OnCancel. This function should not be called until after the method handler
78   // is done and the RPC has completed with a cancellation. This is tracked by
79   // counting how many of these conditions have been met and calling OnCancel
80   // when none remain unmet.
81
82   // Fast version called with known reactor passed in, used from derived
83   // classes, typically in non-cancel case
84   void MaybeCallOnCancel(ServerReactor* reactor) {
85     if (GPR_UNLIKELY(UnblockCancellation())) {
86       CallOnCancel(reactor);
87     }
88   }
89
90   // Slower version called from object that doesn't know the reactor a priori
91   // (such as the ServerContext CompletionOp which is formed before the
92   // reactor). This is used in cancel cases only, so it's ok to be slower and
93   // invoke a virtual function.
94   void MaybeCallOnCancel() {
95     if (GPR_UNLIKELY(UnblockCancellation())) {
96       CallOnCancel(reactor());
97     }
98   }
99
100  protected:
101   /// Increases the reference count
102   void Ref() { callbacks_outstanding_.fetch_add(1, std::memory_order_relaxed); }
103
104   /// Decreases the reference count and returns the previous value
105   int Unref() {
106     return callbacks_outstanding_.fetch_sub(1, std::memory_order_acq_rel);
107   }
108
109  private:
110   virtual ServerReactor* reactor() = 0;
111   virtual void MaybeDone() = 0;
112
113   // If the OnCancel reaction is inlineable, execute it inline. Otherwise send
114   // it to an executor.
115   void CallOnCancel(ServerReactor* reactor);
116
117   // Implement the cancellation constraint counter. Return true if OnCancel
118   // should be called, false otherwise.
119   bool UnblockCancellation() {
120     return on_cancel_conditions_remaining_.fetch_sub(
121                1, std::memory_order_acq_rel) == 1;
122   }
123
124   std::atomic_int on_cancel_conditions_remaining_{2};
125   std::atomic_int callbacks_outstanding_{
126       3};  // reserve for start, Finish, and CompletionOp
127 };
128
129 template <class Request, class Response>
130 class DefaultMessageHolder
131     : public ::grpc::experimental::MessageHolder<Request, Response> {
132  public:
133   DefaultMessageHolder() {
134     this->set_request(&request_obj_);
135     this->set_response(&response_obj_);
136   }
137   void Release() override {
138     // the object is allocated in the call arena.
139     this->~DefaultMessageHolder<Request, Response>();
140   }
141
142  private:
143   Request request_obj_;
144   Response response_obj_;
145 };
146
147 }  // namespace internal
148
149 // Forward declarations
150 class ServerUnaryReactor;
151 template <class Request>
152 class ServerReadReactor;
153 template <class Response>
154 class ServerWriteReactor;
155 template <class Request, class Response>
156 class ServerBidiReactor;
157
158 // NOTE: The actual call/stream object classes are provided as API only to
159 // support mocking. There are no implementations of these class interfaces in
160 // the API.
161 class ServerCallbackUnary : public internal::ServerCallbackCall {
162  public:
163   virtual ~ServerCallbackUnary() {}
164   virtual void Finish(::grpc::Status s) = 0;
165   virtual void SendInitialMetadata() = 0;
166
167  protected:
168   // Use a template rather than explicitly specifying ServerUnaryReactor to
169   // delay binding and avoid a circular forward declaration issue
170   template <class Reactor>
171   void BindReactor(Reactor* reactor) {
172     reactor->InternalBindCall(this);
173   }
174 };
175
176 template <class Request>
177 class ServerCallbackReader : public internal::ServerCallbackCall {
178  public:
179   virtual ~ServerCallbackReader() {}
180   virtual void Finish(::grpc::Status s) = 0;
181   virtual void SendInitialMetadata() = 0;
182   virtual void Read(Request* msg) = 0;
183
184  protected:
185   void BindReactor(ServerReadReactor<Request>* reactor) {
186     reactor->InternalBindReader(this);
187   }
188 };
189
190 template <class Response>
191 class ServerCallbackWriter : public internal::ServerCallbackCall {
192  public:
193   virtual ~ServerCallbackWriter() {}
194
195   virtual void Finish(::grpc::Status s) = 0;
196   virtual void SendInitialMetadata() = 0;
197   virtual void Write(const Response* msg, ::grpc::WriteOptions options) = 0;
198   virtual void WriteAndFinish(const Response* msg, ::grpc::WriteOptions options,
199                               ::grpc::Status s) = 0;
200
201  protected:
202   void BindReactor(ServerWriteReactor<Response>* reactor) {
203     reactor->InternalBindWriter(this);
204   }
205 };
206
207 template <class Request, class Response>
208 class ServerCallbackReaderWriter : public internal::ServerCallbackCall {
209  public:
210   virtual ~ServerCallbackReaderWriter() {}
211
212   virtual void Finish(::grpc::Status s) = 0;
213   virtual void SendInitialMetadata() = 0;
214   virtual void Read(Request* msg) = 0;
215   virtual void Write(const Response* msg, ::grpc::WriteOptions options) = 0;
216   virtual void WriteAndFinish(const Response* msg, ::grpc::WriteOptions options,
217                               ::grpc::Status s) = 0;
218
219  protected:
220   void BindReactor(ServerBidiReactor<Request, Response>* reactor) {
221     reactor->InternalBindStream(this);
222   }
223 };
224
225 // The following classes are the reactor interfaces that are to be implemented
226 // by the user, returned as the output parameter of the method handler for a
227 // callback method. Note that none of the classes are pure; all reactions have a
228 // default empty reaction so that the user class only needs to override those
229 // classes that it cares about.
230
231 /// \a ServerBidiReactor is the interface for a bidirectional streaming RPC.
232 template <class Request, class Response>
233 class ServerBidiReactor : public internal::ServerReactor {
234  public:
235   // NOTE: Initializing stream_ as a constructor initializer rather than a
236   //       default initializer because gcc-4.x requires a copy constructor for
237   //       default initializing a templated member, which isn't ok for atomic.
238   // TODO(vjpai): Switch to default constructor and default initializer when
239   //              gcc-4.x is no longer supported
240   ServerBidiReactor() : stream_(nullptr) {}
241   ~ServerBidiReactor() = default;
242
243   /// Send any initial metadata stored in the RPC context. If not invoked,
244   /// any initial metadata will be passed along with the first Write or the
245   /// Finish (if there are no writes).
246   void StartSendInitialMetadata() {
247     ServerCallbackReaderWriter<Request, Response>* stream =
248         stream_.load(std::memory_order_acquire);
249     if (stream == nullptr) {
250       grpc::internal::MutexLock l(&stream_mu_);
251       stream = stream_.load(std::memory_order_relaxed);
252       if (stream == nullptr) {
253         backlog_.send_initial_metadata_wanted = true;
254         return;
255       }
256     }
257     stream->SendInitialMetadata();
258   }
259
260   /// Initiate a read operation.
261   ///
262   /// \param[out] req Where to eventually store the read message. Valid when
263   ///                 the library calls OnReadDone
264   void StartRead(Request* req) {
265     ServerCallbackReaderWriter<Request, Response>* stream =
266         stream_.load(std::memory_order_acquire);
267     if (stream == nullptr) {
268       grpc::internal::MutexLock l(&stream_mu_);
269       stream = stream_.load(std::memory_order_relaxed);
270       if (stream == nullptr) {
271         backlog_.read_wanted = req;
272         return;
273       }
274     }
275     stream->Read(req);
276   }
277
278   /// Initiate a write operation.
279   ///
280   /// \param[in] resp The message to be written. The library takes temporary
281   ///                 ownership until OnWriteDone, at which point the
282   ///                 application regains ownership of resp.
283   void StartWrite(const Response* resp) {
284     StartWrite(resp, ::grpc::WriteOptions());
285   }
286
287   /// Initiate a write operation with specified options.
288   ///
289   /// \param[in] resp The message to be written. The library takes temporary
290   ///                 ownership until OnWriteDone, at which point the
291   ///                 application regains ownership of resp.
292   /// \param[in] options The WriteOptions to use for writing this message
293   void StartWrite(const Response* resp, ::grpc::WriteOptions options) {
294     ServerCallbackReaderWriter<Request, Response>* stream =
295         stream_.load(std::memory_order_acquire);
296     if (stream == nullptr) {
297       grpc::internal::MutexLock l(&stream_mu_);
298       stream = stream_.load(std::memory_order_relaxed);
299       if (stream == nullptr) {
300         backlog_.write_wanted = resp;
301         backlog_.write_options_wanted = std::move(options);
302         return;
303       }
304     }
305     stream->Write(resp, std::move(options));
306   }
307
308   /// Initiate a write operation with specified options and final RPC Status,
309   /// which also causes any trailing metadata for this RPC to be sent out.
310   /// StartWriteAndFinish is like merging StartWriteLast and Finish into a
311   /// single step. A key difference, though, is that this operation doesn't have
312   /// an OnWriteDone reaction - it is considered complete only when OnDone is
313   /// available. An RPC can either have StartWriteAndFinish or Finish, but not
314   /// both.
315   ///
316   /// \param[in] resp The message to be written. The library takes temporary
317   ///                 ownership until OnWriteDone, at which point the
318   ///                 application regains ownership of resp.
319   /// \param[in] options The WriteOptions to use for writing this message
320   /// \param[in] s The status outcome of this RPC
321   void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options,
322                            ::grpc::Status s) {
323     ServerCallbackReaderWriter<Request, Response>* stream =
324         stream_.load(std::memory_order_acquire);
325     if (stream == nullptr) {
326       grpc::internal::MutexLock l(&stream_mu_);
327       stream = stream_.load(std::memory_order_relaxed);
328       if (stream == nullptr) {
329         backlog_.write_and_finish_wanted = true;
330         backlog_.write_wanted = resp;
331         backlog_.write_options_wanted = std::move(options);
332         backlog_.status_wanted = std::move(s);
333         return;
334       }
335     }
336     stream->WriteAndFinish(resp, std::move(options), std::move(s));
337   }
338
339   /// Inform system of a planned write operation with specified options, but
340   /// allow the library to schedule the actual write coalesced with the writing
341   /// of trailing metadata (which takes place on a Finish call).
342   ///
343   /// \param[in] resp The message to be written. The library takes temporary
344   ///                 ownership until OnWriteDone, at which point the
345   ///                 application regains ownership of resp.
346   /// \param[in] options The WriteOptions to use for writing this message
347   void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) {
348     StartWrite(resp, std::move(options.set_last_message()));
349   }
350
351   /// Indicate that the stream is to be finished and the trailing metadata and
352   /// RPC status are to be sent. Every RPC MUST be finished using either Finish
353   /// or StartWriteAndFinish (but not both), even if the RPC is already
354   /// cancelled.
355   ///
356   /// \param[in] s The status outcome of this RPC
357   void Finish(::grpc::Status s) {
358     ServerCallbackReaderWriter<Request, Response>* stream =
359         stream_.load(std::memory_order_acquire);
360     if (stream == nullptr) {
361       grpc::internal::MutexLock l(&stream_mu_);
362       stream = stream_.load(std::memory_order_relaxed);
363       if (stream == nullptr) {
364         backlog_.finish_wanted = true;
365         backlog_.status_wanted = std::move(s);
366         return;
367       }
368     }
369     stream->Finish(std::move(s));
370   }
371
372   /// Notifies the application that an explicit StartSendInitialMetadata
373   /// operation completed. Not used when the sending of initial metadata
374   /// piggybacks onto the first write.
375   ///
376   /// \param[in] ok Was it successful? If false, no further write-side operation
377   ///               will succeed.
378   virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
379
380   /// Notifies the application that a StartRead operation completed.
381   ///
382   /// \param[in] ok Was it successful? If false, no further read-side operation
383   ///               will succeed.
384   virtual void OnReadDone(bool /*ok*/) {}
385
386   /// Notifies the application that a StartWrite (or StartWriteLast) operation
387   /// completed.
388   ///
389   /// \param[in] ok Was it successful? If false, no further write-side operation
390   ///               will succeed.
391   virtual void OnWriteDone(bool /*ok*/) {}
392
393   /// Notifies the application that all operations associated with this RPC
394   /// have completed. This is an override (from the internal base class) but
395   /// still abstract, so derived classes MUST override it to be instantiated.
396   void OnDone() override = 0;
397
398   /// Notifies the application that this RPC has been cancelled. This is an
399   /// override (from the internal base class) but not final, so derived classes
400   /// should override it if they want to take action.
401   void OnCancel() override {}
402
403  private:
404   friend class ServerCallbackReaderWriter<Request, Response>;
405   // May be overridden by internal implementation details. This is not a public
406   // customization point.
407   virtual void InternalBindStream(
408       ServerCallbackReaderWriter<Request, Response>* stream) {
409     // TODO(vjpai): When stream_or_backlog_ becomes a variant (see below), use
410     // a scoped MutexLock and std::swap stream_or_backlog_ with a variant that
411     // has stream, then std::get<PreBindBacklog> out of that after the lock.
412     // Do likewise with the remaining InternalBind* functions as well.
413     grpc::internal::ReleasableMutexLock l(&stream_mu_);
414     PreBindBacklog ops(std::move(backlog_));
415     stream_.store(stream, std::memory_order_release);
416     l.Unlock();
417
418     if (ops.send_initial_metadata_wanted) {
419       stream->SendInitialMetadata();
420     }
421     if (ops.read_wanted != nullptr) {
422       stream->Read(ops.read_wanted);
423     }
424     if (ops.write_and_finish_wanted) {
425       stream->WriteAndFinish(ops.write_wanted,
426                              std::move(ops.write_options_wanted),
427                              std::move(ops.status_wanted));
428     } else {
429       if (ops.write_wanted != nullptr) {
430         stream->Write(ops.write_wanted, std::move(ops.write_options_wanted));
431       }
432       if (ops.finish_wanted) {
433         stream->Finish(std::move(ops.status_wanted));
434       }
435     }
436   }
437
438   grpc::internal::Mutex stream_mu_;
439   // TODO(vjpai): Make stream_or_backlog_ into a std::variant or absl::variant
440   //              once C++17 or ABSL is supported since stream and backlog are
441   //              mutually exclusive in this class. Do likewise with the
442   //              remaining reactor classes and their backlogs as well.
443   std::atomic<ServerCallbackReaderWriter<Request, Response>*> stream_{nullptr};
444   struct PreBindBacklog {
445     bool send_initial_metadata_wanted = false;
446     bool write_and_finish_wanted = false;
447     bool finish_wanted = false;
448     Request* read_wanted = nullptr;
449     const Response* write_wanted = nullptr;
450     ::grpc::WriteOptions write_options_wanted;
451     ::grpc::Status status_wanted;
452   };
453   PreBindBacklog backlog_ /* GUARDED_BY(stream_mu_) */;
454 };
455
456 /// \a ServerReadReactor is the interface for a client-streaming RPC.
457 template <class Request>
458 class ServerReadReactor : public internal::ServerReactor {
459  public:
460   ServerReadReactor() : reader_(nullptr) {}
461   ~ServerReadReactor() = default;
462
463   /// The following operation initiations are exactly like ServerBidiReactor.
464   void StartSendInitialMetadata() {
465     ServerCallbackReader<Request>* reader =
466         reader_.load(std::memory_order_acquire);
467     if (reader == nullptr) {
468       grpc::internal::MutexLock l(&reader_mu_);
469       reader = reader_.load(std::memory_order_relaxed);
470       if (reader == nullptr) {
471         backlog_.send_initial_metadata_wanted = true;
472         return;
473       }
474     }
475     reader->SendInitialMetadata();
476   }
477   void StartRead(Request* req) {
478     ServerCallbackReader<Request>* reader =
479         reader_.load(std::memory_order_acquire);
480     if (reader == nullptr) {
481       grpc::internal::MutexLock l(&reader_mu_);
482       reader = reader_.load(std::memory_order_relaxed);
483       if (reader == nullptr) {
484         backlog_.read_wanted = req;
485         return;
486       }
487     }
488     reader->Read(req);
489   }
490   void Finish(::grpc::Status s) {
491     ServerCallbackReader<Request>* reader =
492         reader_.load(std::memory_order_acquire);
493     if (reader == nullptr) {
494       grpc::internal::MutexLock l(&reader_mu_);
495       reader = reader_.load(std::memory_order_relaxed);
496       if (reader == nullptr) {
497         backlog_.finish_wanted = true;
498         backlog_.status_wanted = std::move(s);
499         return;
500       }
501     }
502     reader->Finish(std::move(s));
503   }
504
505   /// The following notifications are exactly like ServerBidiReactor.
506   virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
507   virtual void OnReadDone(bool /*ok*/) {}
508   void OnDone() override = 0;
509   void OnCancel() override {}
510
511  private:
512   friend class ServerCallbackReader<Request>;
513
514   // May be overridden by internal implementation details. This is not a public
515   // customization point.
516   virtual void InternalBindReader(ServerCallbackReader<Request>* reader) {
517     grpc::internal::ReleasableMutexLock l(&reader_mu_);
518     PreBindBacklog ops(std::move(backlog_));
519     reader_.store(reader, std::memory_order_release);
520     l.Unlock();
521
522     if (ops.send_initial_metadata_wanted) {
523       reader->SendInitialMetadata();
524     }
525     if (ops.read_wanted != nullptr) {
526       reader->Read(ops.read_wanted);
527     }
528     if (ops.finish_wanted) {
529       reader->Finish(std::move(ops.status_wanted));
530     }
531   }
532
533   grpc::internal::Mutex reader_mu_;
534   std::atomic<ServerCallbackReader<Request>*> reader_{nullptr};
535   struct PreBindBacklog {
536     bool send_initial_metadata_wanted = false;
537     bool finish_wanted = false;
538     Request* read_wanted = nullptr;
539     ::grpc::Status status_wanted;
540   };
541   PreBindBacklog backlog_ /* GUARDED_BY(reader_mu_) */;
542 };
543
544 /// \a ServerWriteReactor is the interface for a server-streaming RPC.
545 template <class Response>
546 class ServerWriteReactor : public internal::ServerReactor {
547  public:
548   ServerWriteReactor() : writer_(nullptr) {}
549   ~ServerWriteReactor() = default;
550
551   /// The following operation initiations are exactly like ServerBidiReactor.
552   void StartSendInitialMetadata() {
553     ServerCallbackWriter<Response>* writer =
554         writer_.load(std::memory_order_acquire);
555     if (writer == nullptr) {
556       grpc::internal::MutexLock l(&writer_mu_);
557       writer = writer_.load(std::memory_order_relaxed);
558       if (writer == nullptr) {
559         backlog_.send_initial_metadata_wanted = true;
560         return;
561       }
562     }
563     writer->SendInitialMetadata();
564   }
565   void StartWrite(const Response* resp) {
566     StartWrite(resp, ::grpc::WriteOptions());
567   }
568   void StartWrite(const Response* resp, ::grpc::WriteOptions options) {
569     ServerCallbackWriter<Response>* writer =
570         writer_.load(std::memory_order_acquire);
571     if (writer == nullptr) {
572       grpc::internal::MutexLock l(&writer_mu_);
573       writer = writer_.load(std::memory_order_relaxed);
574       if (writer == nullptr) {
575         backlog_.write_wanted = resp;
576         backlog_.write_options_wanted = std::move(options);
577         return;
578       }
579     }
580     writer->Write(resp, std::move(options));
581   }
582   void StartWriteAndFinish(const Response* resp, ::grpc::WriteOptions options,
583                            ::grpc::Status s) {
584     ServerCallbackWriter<Response>* writer =
585         writer_.load(std::memory_order_acquire);
586     if (writer == nullptr) {
587       grpc::internal::MutexLock l(&writer_mu_);
588       writer = writer_.load(std::memory_order_relaxed);
589       if (writer == nullptr) {
590         backlog_.write_and_finish_wanted = true;
591         backlog_.write_wanted = resp;
592         backlog_.write_options_wanted = std::move(options);
593         backlog_.status_wanted = std::move(s);
594         return;
595       }
596     }
597     writer->WriteAndFinish(resp, std::move(options), std::move(s));
598   }
599   void StartWriteLast(const Response* resp, ::grpc::WriteOptions options) {
600     StartWrite(resp, std::move(options.set_last_message()));
601   }
602   void Finish(::grpc::Status s) {
603     ServerCallbackWriter<Response>* writer =
604         writer_.load(std::memory_order_acquire);
605     if (writer == nullptr) {
606       grpc::internal::MutexLock l(&writer_mu_);
607       writer = writer_.load(std::memory_order_relaxed);
608       if (writer == nullptr) {
609         backlog_.finish_wanted = true;
610         backlog_.status_wanted = std::move(s);
611         return;
612       }
613     }
614     writer->Finish(std::move(s));
615   }
616
617   /// The following notifications are exactly like ServerBidiReactor.
618   virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
619   virtual void OnWriteDone(bool /*ok*/) {}
620   void OnDone() override = 0;
621   void OnCancel() override {}
622
623  private:
624   friend class ServerCallbackWriter<Response>;
625   // May be overridden by internal implementation details. This is not a public
626   // customization point.
627   virtual void InternalBindWriter(ServerCallbackWriter<Response>* writer) {
628     grpc::internal::ReleasableMutexLock l(&writer_mu_);
629     PreBindBacklog ops(std::move(backlog_));
630     writer_.store(writer, std::memory_order_release);
631     l.Unlock();
632
633     if (ops.send_initial_metadata_wanted) {
634       writer->SendInitialMetadata();
635     }
636     if (ops.write_and_finish_wanted) {
637       writer->WriteAndFinish(ops.write_wanted,
638                              std::move(ops.write_options_wanted),
639                              std::move(ops.status_wanted));
640     } else {
641       if (ops.write_wanted != nullptr) {
642         writer->Write(ops.write_wanted, std::move(ops.write_options_wanted));
643       }
644       if (ops.finish_wanted) {
645         writer->Finish(std::move(ops.status_wanted));
646       }
647     }
648   }
649
650   grpc::internal::Mutex writer_mu_;
651   std::atomic<ServerCallbackWriter<Response>*> writer_{nullptr};
652   struct PreBindBacklog {
653     bool send_initial_metadata_wanted = false;
654     bool write_and_finish_wanted = false;
655     bool finish_wanted = false;
656     const Response* write_wanted = nullptr;
657     ::grpc::WriteOptions write_options_wanted;
658     ::grpc::Status status_wanted;
659   };
660   PreBindBacklog backlog_ /* GUARDED_BY(writer_mu_) */;
661 };
662
663 class ServerUnaryReactor : public internal::ServerReactor {
664  public:
665   ServerUnaryReactor() : call_(nullptr) {}
666   ~ServerUnaryReactor() = default;
667
668   /// The following operation initiations are exactly like ServerBidiReactor.
669   void StartSendInitialMetadata() {
670     ServerCallbackUnary* call = call_.load(std::memory_order_acquire);
671     if (call == nullptr) {
672       grpc::internal::MutexLock l(&call_mu_);
673       call = call_.load(std::memory_order_relaxed);
674       if (call == nullptr) {
675         backlog_.send_initial_metadata_wanted = true;
676         return;
677       }
678     }
679     call->SendInitialMetadata();
680   }
681   void Finish(::grpc::Status s) {
682     ServerCallbackUnary* call = call_.load(std::memory_order_acquire);
683     if (call == nullptr) {
684       grpc::internal::MutexLock l(&call_mu_);
685       call = call_.load(std::memory_order_relaxed);
686       if (call == nullptr) {
687         backlog_.finish_wanted = true;
688         backlog_.status_wanted = std::move(s);
689         return;
690       }
691     }
692     call->Finish(std::move(s));
693   }
694
695   /// The following notifications are exactly like ServerBidiReactor.
696   virtual void OnSendInitialMetadataDone(bool /*ok*/) {}
697   void OnDone() override = 0;
698   void OnCancel() override {}
699
700  private:
701   friend class ServerCallbackUnary;
702   // May be overridden by internal implementation details. This is not a public
703   // customization point.
704   virtual void InternalBindCall(ServerCallbackUnary* call) {
705     grpc::internal::ReleasableMutexLock l(&call_mu_);
706     PreBindBacklog ops(std::move(backlog_));
707     call_.store(call, std::memory_order_release);
708     l.Unlock();
709
710     if (ops.send_initial_metadata_wanted) {
711       call->SendInitialMetadata();
712     }
713     if (ops.finish_wanted) {
714       call->Finish(std::move(ops.status_wanted));
715     }
716   }
717
718   grpc::internal::Mutex call_mu_;
719   std::atomic<ServerCallbackUnary*> call_{nullptr};
720   struct PreBindBacklog {
721     bool send_initial_metadata_wanted = false;
722     bool finish_wanted = false;
723     ::grpc::Status status_wanted;
724   };
725   PreBindBacklog backlog_ /* GUARDED_BY(call_mu_) */;
726 };
727
728 namespace internal {
729
730 template <class Base>
731 class FinishOnlyReactor : public Base {
732  public:
733   explicit FinishOnlyReactor(::grpc::Status s) { this->Finish(std::move(s)); }
734   void OnDone() override { this->~FinishOnlyReactor(); }
735 };
736
737 using UnimplementedUnaryReactor = FinishOnlyReactor<ServerUnaryReactor>;
738 template <class Request>
739 using UnimplementedReadReactor = FinishOnlyReactor<ServerReadReactor<Request>>;
740 template <class Response>
741 using UnimplementedWriteReactor =
742     FinishOnlyReactor<ServerWriteReactor<Response>>;
743 template <class Request, class Response>
744 using UnimplementedBidiReactor =
745     FinishOnlyReactor<ServerBidiReactor<Request, Response>>;
746
747 }  // namespace internal
748 }  // namespace grpc_impl
749
750 #endif  // GRPCPP_IMPL_CODEGEN_SERVER_CALLBACK_IMPL_H