3 * Copyright 2018 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #ifndef GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
20 #define GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H
22 #include <grpc/support/port_platform.h>
24 #include <grpc/support/log.h>
25 #include <grpcpp/alarm.h>
26 #include <grpcpp/grpcpp.h>
28 #include "src/core/lib/gprpp/sync.h"
29 #include "src/core/lib/gprpp/thd.h"
30 #include "src/cpp/server/load_reporter/load_reporter.h"
33 namespace load_reporter {
35 // Async load reporting service. It's mainly responsible for controlling the
36 // procedure of incoming requests. The real business logic is handed off to the
37 // LoadReporter. There should be at most one instance of this service on a
38 // server to avoid spreading the load data into multiple places.
39 class LoadReporterAsyncServiceImpl
40 : public grpc::lb::v1::LoadReporter::AsyncService {
42 explicit LoadReporterAsyncServiceImpl(
43 std::unique_ptr<ServerCompletionQueue> cq);
44 ~LoadReporterAsyncServiceImpl() override;
46 // Starts the working thread.
49 // Not copyable nor movable.
50 LoadReporterAsyncServiceImpl(const LoadReporterAsyncServiceImpl&) = delete;
51 LoadReporterAsyncServiceImpl& operator=(const LoadReporterAsyncServiceImpl&) =
55 class ReportLoadHandler;
57 // A tag that can be called with a bool argument. It's tailored for
58 // ReportLoadHandler's use. Before being used, it should be constructed with a
59 // method of ReportLoadHandler and a shared pointer to the handler. The
60 // shared pointer will be moved to the invoked function and the function can
61 // only be invoked once. That makes ref counting of the handler easier,
62 // because the shared pointer is not bound to the function and can be gone
63 // once the invoked function returns (if not used any more).
66 using HandlerFunction =
67 std::function<void(std::shared_ptr<ReportLoadHandler>, bool)>;
71 CallableTag(HandlerFunction func,
72 std::shared_ptr<ReportLoadHandler> handler)
73 : handler_function_(std::move(func)), handler_(std::move(handler)) {
74 GPR_ASSERT(handler_function_ != nullptr);
75 GPR_ASSERT(handler_ != nullptr);
78 // Runs the tag. This should be called only once. The handler is no longer
79 // owned by this tag after this method is invoked.
82 // Releases and returns the shared pointer to the handler.
83 std::shared_ptr<ReportLoadHandler> ReleaseHandler() {
84 return std::move(handler_);
88 HandlerFunction handler_function_ = nullptr;
89 std::shared_ptr<ReportLoadHandler> handler_;
92 // Each handler takes care of one load reporting stream. It contains
93 // per-stream data and it will access the members of the parent class (i.e.,
94 // LoadReporterAsyncServiceImpl) for service-wide data (e.g., the load data).
95 class ReportLoadHandler {
97 // Instantiates a ReportLoadHandler and requests the next load reporting
98 // call. The handler object will manage its own lifetime, so no action is
99 // needed from the caller any more regarding that object.
100 static void CreateAndStart(ServerCompletionQueue* cq,
101 LoadReporterAsyncServiceImpl* service,
102 LoadReporter* load_reporter);
104 // This ctor is public because we want to use std::make_shared<> in
105 // CreateAndStart(). This ctor shouldn't be used elsewhere.
106 ReportLoadHandler(ServerCompletionQueue* cq,
107 LoadReporterAsyncServiceImpl* service,
108 LoadReporter* load_reporter);
111 // After the handler has a call request delivered, it starts reading the
112 // initial request. Also, a new handler is spawned so that we can keep
113 // servicing future calls.
114 void OnRequestDelivered(std::shared_ptr<ReportLoadHandler> self, bool ok);
116 // The first Read() is expected to succeed, after which the handler starts
117 // sending load reports back to the balancer. The second Read() is
118 // expected to fail, which happens when the balancer half-closes the
119 // stream to signal that it's no longer interested in the load reports. For
120 // the latter case, the handler will then close the stream.
121 void OnReadDone(std::shared_ptr<ReportLoadHandler> self, bool ok);
123 // The report sending operations are sequential as: send report -> send
124 // done, schedule the next send -> waiting for the alarm to fire -> alarm
125 // fires, send report -> ...
126 void SendReport(std::shared_ptr<ReportLoadHandler> self, bool ok);
127 void ScheduleNextReport(std::shared_ptr<ReportLoadHandler> self, bool ok);
129 // Called when Finish() is done.
130 void OnFinishDone(std::shared_ptr<ReportLoadHandler> self, bool ok);
132 // Called when AsyncNotifyWhenDone() notifies us.
133 void OnDoneNotified(std::shared_ptr<ReportLoadHandler> self, bool ok);
135 void Shutdown(std::shared_ptr<ReportLoadHandler> self, const char* reason);
137 // The key fields of the stream.
139 std::string load_balanced_hostname_;
140 std::string load_key_;
141 uint64_t load_report_interval_ms_;
143 // The data for RPC communication with the load reportee.
145 ::grpc::lb::v1::LoadReportRequest request_;
147 // The members passed down from LoadReporterAsyncServiceImpl.
148 ServerCompletionQueue* cq_;
149 LoadReporterAsyncServiceImpl* service_;
150 LoadReporter* load_reporter_;
151 ServerAsyncReaderWriter<::grpc::lb::v1::LoadReportResponse,
152 ::grpc::lb::v1::LoadReportRequest>
155 // The status of the RPC progress.
157 WAITING_FOR_DELIVERY,
159 INITIAL_REQUEST_RECEIVED,
160 INITIAL_RESPONSE_SENT,
163 bool shutdown_{false};
164 bool done_notified_{false};
165 bool is_cancelled_{false};
166 CallableTag on_done_notified_;
167 CallableTag on_finish_done_;
168 CallableTag next_inbound_;
169 CallableTag next_outbound_;
170 std::unique_ptr<Alarm> next_report_alarm_;
173 // Handles the incoming requests and drives the completion queue in a loop.
174 static void Work(void* arg);
176 // Schedules the next data fetching from Census and LB feedback sampling.
177 void ScheduleNextFetchAndSample();
179 // Fetches data from Census and samples LB feedback.
180 void FetchAndSample(bool ok);
182 std::unique_ptr<ServerCompletionQueue> cq_;
183 // To synchronize the operations related to shutdown state of cq_, so that we
184 // don't enqueue new tags into cq_ after it is already shut down.
185 grpc_core::Mutex cq_shutdown_mu_;
186 std::atomic_bool shutdown_{false};
187 std::unique_ptr<::grpc_core::Thread> thread_;
188 std::unique_ptr<LoadReporter> load_reporter_;
189 std::unique_ptr<Alarm> next_fetch_and_sample_alarm_;
192 } // namespace load_reporter
195 #endif // GRPC_SRC_CPP_SERVER_LOAD_REPORTER_ASYNC_SERVICE_IMPL_H