Imported Upstream version 1.33.1
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / lb_policy / subchannel_list.h
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #ifndef GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
20 #define GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H
21
22 #include <grpc/support/port_platform.h>
23
24 #include <string.h>
25
26 #include <grpc/support/alloc.h>
27
28 #include "absl/container/inlined_vector.h"
29
30 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
31 #include "src/core/ext/filters/client_channel/server_address.h"
32 // TODO(roth): Should not need the include of subchannel.h here, since
33 // that implementation should be hidden from the LB policy API.
34 #include "src/core/ext/filters/client_channel/subchannel.h"
35 #include "src/core/ext/filters/client_channel/subchannel_interface.h"
36 #include "src/core/lib/channel/channel_args.h"
37 #include "src/core/lib/debug/trace.h"
38 #include "src/core/lib/gprpp/orphanable.h"
39 #include "src/core/lib/gprpp/ref_counted.h"
40 #include "src/core/lib/gprpp/ref_counted_ptr.h"
41 #include "src/core/lib/iomgr/closure.h"
42 #include "src/core/lib/iomgr/sockaddr_utils.h"
43 #include "src/core/lib/transport/connectivity_state.h"
44
45 // Code for maintaining a list of subchannels within an LB policy.
46 //
47 // To use this, callers must create their own subclasses, like so:
48 /*
49
50 class MySubchannelList;  // Forward declaration.
51
52 class MySubchannelData
53     : public SubchannelData<MySubchannelList, MySubchannelData> {
54  public:
55   void ProcessConnectivityChangeLocked(
56       grpc_connectivity_state connectivity_state) override {
57     // ...code to handle connectivity changes...
58   }
59 };
60
61 class MySubchannelList
62     : public SubchannelList<MySubchannelList, MySubchannelData> {
63 };
64
65 */
66 // All methods will be called from within the client_channel work serializer.
67
68 namespace grpc_core {
69
70 // Forward declaration.
71 template <typename SubchannelListType, typename SubchannelDataType>
72 class SubchannelList;
73
74 // Stores data for a particular subchannel in a subchannel list.
75 // Callers must create a subclass that implements the
76 // ProcessConnectivityChangeLocked() method.
77 template <typename SubchannelListType, typename SubchannelDataType>
78 class SubchannelData {
79  public:
80   // Returns a pointer to the subchannel list containing this object.
81   SubchannelListType* subchannel_list() const {
82     return static_cast<SubchannelListType*>(subchannel_list_);
83   }
84
85   // Returns the index into the subchannel list of this object.
86   size_t Index() const {
87     return static_cast<size_t>(static_cast<const SubchannelDataType*>(this) -
88                                subchannel_list_->subchannel(0));
89   }
90
91   // Returns a pointer to the subchannel.
92   SubchannelInterface* subchannel() const { return subchannel_.get(); }
93
94   // Synchronously checks the subchannel's connectivity state.
95   // Must not be called while there is a connectivity notification
96   // pending (i.e., between calling StartConnectivityWatchLocked() and
97   // calling CancelConnectivityWatchLocked()).
98   grpc_connectivity_state CheckConnectivityStateLocked() {
99     GPR_ASSERT(pending_watcher_ == nullptr);
100     connectivity_state_ = subchannel_->CheckConnectivityState();
101     return connectivity_state_;
102   }
103
104   // Resets the connection backoff.
105   // TODO(roth): This method should go away when we move the backoff
106   // code out of the subchannel and into the LB policies.
107   void ResetBackoffLocked();
108
109   // Starts watching the connectivity state of the subchannel.
110   // ProcessConnectivityChangeLocked() will be called whenever the
111   // connectivity state changes.
112   void StartConnectivityWatchLocked();
113
114   // Cancels watching the connectivity state of the subchannel.
115   void CancelConnectivityWatchLocked(const char* reason);
116
117   // Cancels any pending connectivity watch and unrefs the subchannel.
118   void ShutdownLocked();
119
120  protected:
121   SubchannelData(
122       SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
123       const ServerAddress& address,
124       RefCountedPtr<SubchannelInterface> subchannel);
125
126   virtual ~SubchannelData();
127
128   // After StartConnectivityWatchLocked() is called, this method will be
129   // invoked whenever the subchannel's connectivity state changes.
130   // To stop watching, use CancelConnectivityWatchLocked().
131   virtual void ProcessConnectivityChangeLocked(
132       grpc_connectivity_state connectivity_state) = 0;
133
134  private:
135   // Watcher for subchannel connectivity state.
136   class Watcher
137       : public SubchannelInterface::ConnectivityStateWatcherInterface {
138    public:
139     Watcher(
140         SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data,
141         RefCountedPtr<SubchannelListType> subchannel_list)
142         : subchannel_data_(subchannel_data),
143           subchannel_list_(std::move(subchannel_list)) {}
144
145     ~Watcher() { subchannel_list_.reset(DEBUG_LOCATION, "Watcher dtor"); }
146
147     void OnConnectivityStateChange(grpc_connectivity_state new_state) override;
148
149     grpc_pollset_set* interested_parties() override {
150       return subchannel_list_->policy()->interested_parties();
151     }
152
153    private:
154     SubchannelData<SubchannelListType, SubchannelDataType>* subchannel_data_;
155     RefCountedPtr<SubchannelListType> subchannel_list_;
156   };
157
158   // Unrefs the subchannel.
159   void UnrefSubchannelLocked(const char* reason);
160
161   // Backpointer to owning subchannel list.  Not owned.
162   SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list_;
163   // The subchannel.
164   RefCountedPtr<SubchannelInterface> subchannel_;
165   // Will be non-null when the subchannel's state is being watched.
166   SubchannelInterface::ConnectivityStateWatcherInterface* pending_watcher_ =
167       nullptr;
168   // Data updated by the watcher.
169   grpc_connectivity_state connectivity_state_;
170 };
171
172 // A list of subchannels.
173 template <typename SubchannelListType, typename SubchannelDataType>
174 class SubchannelList : public InternallyRefCounted<SubchannelListType> {
175  public:
176   typedef absl::InlinedVector<SubchannelDataType, 10> SubchannelVector;
177
178   // The number of subchannels in the list.
179   size_t num_subchannels() const { return subchannels_.size(); }
180
181   // The data for the subchannel at a particular index.
182   SubchannelDataType* subchannel(size_t index) { return &subchannels_[index]; }
183
184   // Returns true if the subchannel list is shutting down.
185   bool shutting_down() const { return shutting_down_; }
186
187   // Accessors.
188   LoadBalancingPolicy* policy() const { return policy_; }
189   TraceFlag* tracer() const { return tracer_; }
190
191   // Resets connection backoff of all subchannels.
192   // TODO(roth): We will probably need to rethink this as part of moving
193   // the backoff code out of subchannels and into LB policies.
194   void ResetBackoffLocked();
195
196   void Orphan() override {
197     ShutdownLocked();
198     InternallyRefCounted<SubchannelListType>::Unref(DEBUG_LOCATION, "shutdown");
199   }
200
201  protected:
202   SubchannelList(LoadBalancingPolicy* policy, TraceFlag* tracer,
203                  ServerAddressList addresses,
204                  LoadBalancingPolicy::ChannelControlHelper* helper,
205                  const grpc_channel_args& args);
206
207   virtual ~SubchannelList();
208
209  private:
210   // For accessing Ref() and Unref().
211   friend class SubchannelData<SubchannelListType, SubchannelDataType>;
212
213   void ShutdownLocked();
214
215   // Backpointer to owning policy.
216   LoadBalancingPolicy* policy_;
217
218   TraceFlag* tracer_;
219
220   // The list of subchannels.
221   SubchannelVector subchannels_;
222
223   // Is this list shutting down? This may be true due to the shutdown of the
224   // policy itself or because a newer update has arrived while this one hadn't
225   // finished processing.
226   bool shutting_down_ = false;
227 };
228
229 //
230 // implementation -- no user-servicable parts below
231 //
232
233 //
234 // SubchannelData::Watcher
235 //
236
237 template <typename SubchannelListType, typename SubchannelDataType>
238 void SubchannelData<SubchannelListType, SubchannelDataType>::Watcher::
239     OnConnectivityStateChange(grpc_connectivity_state new_state) {
240   if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
241     gpr_log(GPR_INFO,
242             "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
243             " (subchannel %p): connectivity changed: state=%s, "
244             "shutting_down=%d, pending_watcher=%p",
245             subchannel_list_->tracer()->name(), subchannel_list_->policy(),
246             subchannel_list_.get(), subchannel_data_->Index(),
247             subchannel_list_->num_subchannels(),
248             subchannel_data_->subchannel_.get(),
249             ConnectivityStateName(new_state), subchannel_list_->shutting_down(),
250             subchannel_data_->pending_watcher_);
251   }
252   if (!subchannel_list_->shutting_down() &&
253       subchannel_data_->pending_watcher_ != nullptr) {
254     subchannel_data_->connectivity_state_ = new_state;
255     // Call the subclass's ProcessConnectivityChangeLocked() method.
256     subchannel_data_->ProcessConnectivityChangeLocked(new_state);
257   }
258 }
259
260 //
261 // SubchannelData
262 //
263
264 template <typename SubchannelListType, typename SubchannelDataType>
265 SubchannelData<SubchannelListType, SubchannelDataType>::SubchannelData(
266     SubchannelList<SubchannelListType, SubchannelDataType>* subchannel_list,
267     const ServerAddress& /*address*/,
268     RefCountedPtr<SubchannelInterface> subchannel)
269     : subchannel_list_(subchannel_list),
270       subchannel_(std::move(subchannel)),
271       // We assume that the current state is IDLE.  If not, we'll get a
272       // callback telling us that.
273       connectivity_state_(GRPC_CHANNEL_IDLE) {}
274
275 template <typename SubchannelListType, typename SubchannelDataType>
276 SubchannelData<SubchannelListType, SubchannelDataType>::~SubchannelData() {
277   GPR_ASSERT(subchannel_ == nullptr);
278 }
279
280 template <typename SubchannelListType, typename SubchannelDataType>
281 void SubchannelData<SubchannelListType, SubchannelDataType>::
282     UnrefSubchannelLocked(const char* reason) {
283   if (subchannel_ != nullptr) {
284     if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
285       gpr_log(GPR_INFO,
286               "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
287               " (subchannel %p): unreffing subchannel (%s)",
288               subchannel_list_->tracer()->name(), subchannel_list_->policy(),
289               subchannel_list_, Index(), subchannel_list_->num_subchannels(),
290               subchannel_.get(), reason);
291     }
292     subchannel_.reset();
293   }
294 }
295
296 template <typename SubchannelListType, typename SubchannelDataType>
297 void SubchannelData<SubchannelListType,
298                     SubchannelDataType>::ResetBackoffLocked() {
299   if (subchannel_ != nullptr) {
300     subchannel_->ResetBackoff();
301   }
302 }
303
304 template <typename SubchannelListType, typename SubchannelDataType>
305 void SubchannelData<SubchannelListType,
306                     SubchannelDataType>::StartConnectivityWatchLocked() {
307   if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
308     gpr_log(GPR_INFO,
309             "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
310             " (subchannel %p): starting watch (from %s)",
311             subchannel_list_->tracer()->name(), subchannel_list_->policy(),
312             subchannel_list_, Index(), subchannel_list_->num_subchannels(),
313             subchannel_.get(), ConnectivityStateName(connectivity_state_));
314   }
315   GPR_ASSERT(pending_watcher_ == nullptr);
316   pending_watcher_ =
317       new Watcher(this, subchannel_list()->Ref(DEBUG_LOCATION, "Watcher"));
318   subchannel_->WatchConnectivityState(
319       connectivity_state_,
320       std::unique_ptr<SubchannelInterface::ConnectivityStateWatcherInterface>(
321           pending_watcher_));
322 }
323
324 template <typename SubchannelListType, typename SubchannelDataType>
325 void SubchannelData<SubchannelListType, SubchannelDataType>::
326     CancelConnectivityWatchLocked(const char* reason) {
327   if (GRPC_TRACE_FLAG_ENABLED(*subchannel_list_->tracer())) {
328     gpr_log(GPR_INFO,
329             "[%s %p] subchannel list %p index %" PRIuPTR " of %" PRIuPTR
330             " (subchannel %p): canceling connectivity watch (%s)",
331             subchannel_list_->tracer()->name(), subchannel_list_->policy(),
332             subchannel_list_, Index(), subchannel_list_->num_subchannels(),
333             subchannel_.get(), reason);
334   }
335   if (pending_watcher_ != nullptr) {
336     subchannel_->CancelConnectivityStateWatch(pending_watcher_);
337     pending_watcher_ = nullptr;
338   }
339 }
340
341 template <typename SubchannelListType, typename SubchannelDataType>
342 void SubchannelData<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
343   if (pending_watcher_ != nullptr) CancelConnectivityWatchLocked("shutdown");
344   UnrefSubchannelLocked("shutdown");
345 }
346
347 //
348 // SubchannelList
349 //
350
351 template <typename SubchannelListType, typename SubchannelDataType>
352 SubchannelList<SubchannelListType, SubchannelDataType>::SubchannelList(
353     LoadBalancingPolicy* policy, TraceFlag* tracer, ServerAddressList addresses,
354     LoadBalancingPolicy::ChannelControlHelper* helper,
355     const grpc_channel_args& args)
356     : InternallyRefCounted<SubchannelListType>(tracer),
357       policy_(policy),
358       tracer_(tracer) {
359   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
360     gpr_log(GPR_INFO,
361             "[%s %p] Creating subchannel list %p for %" PRIuPTR " subchannels",
362             tracer_->name(), policy, this, addresses.size());
363   }
364   subchannels_.reserve(addresses.size());
365   // Create a subchannel for each address.
366   for (const ServerAddress& address : addresses) {
367     RefCountedPtr<SubchannelInterface> subchannel =
368         helper->CreateSubchannel(std::move(address), args);
369     if (subchannel == nullptr) {
370       // Subchannel could not be created.
371       if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
372         gpr_log(GPR_INFO,
373                 "[%s %p] could not create subchannel for address %s, "
374                 "ignoring",
375                 tracer_->name(), policy_, address.ToString().c_str());
376       }
377       continue;
378     }
379     if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
380       gpr_log(GPR_INFO,
381               "[%s %p] subchannel list %p index %" PRIuPTR
382               ": Created subchannel %p for address %s",
383               tracer_->name(), policy_, this, subchannels_.size(),
384               subchannel.get(), address.ToString().c_str());
385     }
386     subchannels_.emplace_back(this, address, std::move(subchannel));
387   }
388 }
389
390 template <typename SubchannelListType, typename SubchannelDataType>
391 SubchannelList<SubchannelListType, SubchannelDataType>::~SubchannelList() {
392   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
393     gpr_log(GPR_INFO, "[%s %p] Destroying subchannel_list %p", tracer_->name(),
394             policy_, this);
395   }
396 }
397
398 template <typename SubchannelListType, typename SubchannelDataType>
399 void SubchannelList<SubchannelListType, SubchannelDataType>::ShutdownLocked() {
400   if (GRPC_TRACE_FLAG_ENABLED(*tracer_)) {
401     gpr_log(GPR_INFO, "[%s %p] Shutting down subchannel_list %p",
402             tracer_->name(), policy_, this);
403   }
404   GPR_ASSERT(!shutting_down_);
405   shutting_down_ = true;
406   for (size_t i = 0; i < subchannels_.size(); i++) {
407     SubchannelDataType* sd = &subchannels_[i];
408     sd->ShutdownLocked();
409   }
410 }
411
412 template <typename SubchannelListType, typename SubchannelDataType>
413 void SubchannelList<SubchannelListType,
414                     SubchannelDataType>::ResetBackoffLocked() {
415   for (size_t i = 0; i < subchannels_.size(); i++) {
416     SubchannelDataType* sd = &subchannels_[i];
417     sd->ResetBackoffLocked();
418   }
419 }
420
421 }  // namespace grpc_core
422
423 #endif /* GRPC_CORE_EXT_FILTERS_CLIENT_CHANNEL_LB_POLICY_SUBCHANNEL_LIST_H */