d3faaaddc98ffc297de3d478aafbd6f163f91409
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / lb_policy / round_robin / round_robin.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 /** Round Robin Policy.
20  *
21  * Before every pick, the \a get_next_ready_subchannel_index_locked function
22  * returns the p->subchannel_list->subchannels index for next subchannel,
23  * respecting the relative order of the addresses provided upon creation or
24  * updates. Note however that updates will start picking from the beginning of
25  * the updated list. */
26
27 #include <grpc/support/port_platform.h>
28
29 #include <stdlib.h>
30 #include <string.h>
31
32 #include <grpc/support/alloc.h>
33
34 #include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
35 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
36 #include "src/core/ext/filters/client_channel/subchannel.h"
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/debug/trace.h"
39 #include "src/core/lib/gprpp/mutex_lock.h"
40 #include "src/core/lib/gprpp/ref_counted_ptr.h"
41 #include "src/core/lib/iomgr/combiner.h"
42 #include "src/core/lib/iomgr/sockaddr_utils.h"
43 #include "src/core/lib/transport/connectivity_state.h"
44 #include "src/core/lib/transport/static_metadata.h"
45
46 namespace grpc_core {
47
48 TraceFlag grpc_lb_round_robin_trace(false, "round_robin");
49
50 namespace {
51
52 //
53 // round_robin LB policy
54 //
55
56 constexpr char kRoundRobin[] = "round_robin";
57
58 class RoundRobin : public LoadBalancingPolicy {
59  public:
60   explicit RoundRobin(Args args);
61
62   const char* name() const override { return kRoundRobin; }
63
64   void UpdateLocked(UpdateArgs args) override;
65   void ResetBackoffLocked() override;
66   void FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
67                                 channelz::ChildRefsList* ignored) override;
68
69  private:
70   ~RoundRobin();
71
72   // Forward declaration.
73   class RoundRobinSubchannelList;
74
75   // Data for a particular subchannel in a subchannel list.
76   // This subclass adds the following functionality:
77   // - Tracks the previous connectivity state of the subchannel, so that
78   //   we know how many subchannels are in each state.
79   class RoundRobinSubchannelData
80       : public SubchannelData<RoundRobinSubchannelList,
81                               RoundRobinSubchannelData> {
82    public:
83     RoundRobinSubchannelData(
84         SubchannelList<RoundRobinSubchannelList, RoundRobinSubchannelData>*
85             subchannel_list,
86         const ServerAddress& address, Subchannel* subchannel,
87         grpc_combiner* combiner)
88         : SubchannelData(subchannel_list, address, subchannel, combiner) {}
89
90     grpc_connectivity_state connectivity_state() const {
91       return last_connectivity_state_;
92     }
93
94     void UpdateConnectivityStateLocked(
95         grpc_connectivity_state connectivity_state, grpc_error* error);
96
97    private:
98     void ProcessConnectivityChangeLocked(
99         grpc_connectivity_state connectivity_state, grpc_error* error) override;
100
101     grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE;
102   };
103
104   // A list of subchannels.
105   class RoundRobinSubchannelList
106       : public SubchannelList<RoundRobinSubchannelList,
107                               RoundRobinSubchannelData> {
108    public:
109     RoundRobinSubchannelList(RoundRobin* policy, TraceFlag* tracer,
110                              const ServerAddressList& addresses,
111                              grpc_combiner* combiner,
112                              const grpc_channel_args& args)
113         : SubchannelList(policy, tracer, addresses, combiner,
114                          policy->channel_control_helper(), args) {
115       // Need to maintain a ref to the LB policy as long as we maintain
116       // any references to subchannels, since the subchannels'
117       // pollset_sets will include the LB policy's pollset_set.
118       policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
119     }
120
121     ~RoundRobinSubchannelList() {
122       GRPC_ERROR_UNREF(last_transient_failure_error_);
123       RoundRobin* p = static_cast<RoundRobin*>(policy());
124       p->Unref(DEBUG_LOCATION, "subchannel_list");
125     }
126
127     // Starts watching the subchannels in this list.
128     void StartWatchingLocked();
129
130     // Updates the counters of subchannels in each state when a
131     // subchannel transitions from old_state to new_state.
132     // transient_failure_error is the error that is reported when
133     // new_state is TRANSIENT_FAILURE.
134     void UpdateStateCountersLocked(grpc_connectivity_state old_state,
135                                    grpc_connectivity_state new_state,
136                                    grpc_error* transient_failure_error);
137
138     // If this subchannel list is the RR policy's current subchannel
139     // list, updates the RR policy's connectivity state based on the
140     // subchannel list's state counters.
141     void MaybeUpdateRoundRobinConnectivityStateLocked();
142
143     // Updates the RR policy's overall state based on the counters of
144     // subchannels in each state.
145     void UpdateRoundRobinStateFromSubchannelStateCountsLocked();
146
147    private:
148     size_t num_ready_ = 0;
149     size_t num_connecting_ = 0;
150     size_t num_transient_failure_ = 0;
151     grpc_error* last_transient_failure_error_ = GRPC_ERROR_NONE;
152   };
153
154   class Picker : public SubchannelPicker {
155    public:
156     Picker(RoundRobin* parent, RoundRobinSubchannelList* subchannel_list);
157
158     PickResult Pick(PickArgs* pick, grpc_error** error) override;
159
160    private:
161     // Using pointer value only, no ref held -- do not dereference!
162     RoundRobin* parent_;
163
164     size_t last_picked_index_;
165     InlinedVector<RefCountedPtr<ConnectedSubchannel>, 10> subchannels_;
166   };
167
168   // Helper class to ensure that any function that modifies the child refs
169   // data structures will update the channelz snapshot data structures before
170   // returning.
171   class AutoChildRefsUpdater {
172    public:
173     explicit AutoChildRefsUpdater(RoundRobin* rr) : rr_(rr) {}
174     ~AutoChildRefsUpdater() { rr_->UpdateChildRefsLocked(); }
175
176    private:
177     RoundRobin* rr_;
178   };
179
180   void ShutdownLocked() override;
181
182   void UpdateChildRefsLocked();
183
184   /** list of subchannels */
185   OrphanablePtr<RoundRobinSubchannelList> subchannel_list_;
186   /** Latest version of the subchannel list.
187    * Subchannel connectivity callbacks will only promote updated subchannel
188    * lists if they equal \a latest_pending_subchannel_list. In other words,
189    * racing callbacks that reference outdated subchannel lists won't perform any
190    * update. */
191   OrphanablePtr<RoundRobinSubchannelList> latest_pending_subchannel_list_;
192   /** are we shutting down? */
193   bool shutdown_ = false;
194   /// Lock and data used to capture snapshots of this channel's child
195   /// channels and subchannels. This data is consumed by channelz.
196   gpr_mu child_refs_mu_;
197   channelz::ChildRefsList child_subchannels_;
198   channelz::ChildRefsList child_channels_;
199 };
200
201 //
202 // RoundRobin::Picker
203 //
204
205 RoundRobin::Picker::Picker(RoundRobin* parent,
206                            RoundRobinSubchannelList* subchannel_list)
207     : parent_(parent) {
208   for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
209     auto* connected_subchannel =
210         subchannel_list->subchannel(i)->connected_subchannel();
211     if (connected_subchannel != nullptr) {
212       subchannels_.push_back(connected_subchannel->Ref());
213     }
214   }
215   // For discussion on why we generate a random starting index for
216   // the picker, see https://github.com/grpc/grpc-go/issues/2580.
217   // TODO(roth): rand(3) is not thread-safe.  This should be replaced with
218   // something better as part of https://github.com/grpc/grpc/issues/17891.
219   last_picked_index_ = rand() % subchannels_.size();
220   if (grpc_lb_round_robin_trace.enabled()) {
221     gpr_log(GPR_INFO,
222             "[RR %p picker %p] created picker from subchannel_list=%p "
223             "with %" PRIuPTR " READY subchannels; last_picked_index_=%" PRIuPTR,
224             parent_, this, subchannel_list, subchannels_.size(),
225             last_picked_index_);
226   }
227 }
228
229 RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs* pick,
230                                                 grpc_error** error) {
231   last_picked_index_ = (last_picked_index_ + 1) % subchannels_.size();
232   if (grpc_lb_round_robin_trace.enabled()) {
233     gpr_log(GPR_INFO,
234             "[RR %p picker %p] returning index %" PRIuPTR
235             ", connected_subchannel=%p",
236             parent_, this, last_picked_index_,
237             subchannels_[last_picked_index_].get());
238   }
239   pick->connected_subchannel = subchannels_[last_picked_index_];
240   return PICK_COMPLETE;
241 }
242
243 //
244 // RoundRobin
245 //
246
247 RoundRobin::RoundRobin(Args args) : LoadBalancingPolicy(std::move(args)) {
248   gpr_mu_init(&child_refs_mu_);
249   if (grpc_lb_round_robin_trace.enabled()) {
250     gpr_log(GPR_INFO, "[RR %p] Created", this);
251   }
252 }
253
254 RoundRobin::~RoundRobin() {
255   if (grpc_lb_round_robin_trace.enabled()) {
256     gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this);
257   }
258   gpr_mu_destroy(&child_refs_mu_);
259   GPR_ASSERT(subchannel_list_ == nullptr);
260   GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
261 }
262
263 void RoundRobin::ShutdownLocked() {
264   AutoChildRefsUpdater guard(this);
265   if (grpc_lb_round_robin_trace.enabled()) {
266     gpr_log(GPR_INFO, "[RR %p] Shutting down", this);
267   }
268   shutdown_ = true;
269   subchannel_list_.reset();
270   latest_pending_subchannel_list_.reset();
271 }
272
273 void RoundRobin::ResetBackoffLocked() {
274   subchannel_list_->ResetBackoffLocked();
275   if (latest_pending_subchannel_list_ != nullptr) {
276     latest_pending_subchannel_list_->ResetBackoffLocked();
277   }
278 }
279
280 void RoundRobin::FillChildRefsForChannelz(
281     channelz::ChildRefsList* child_subchannels_to_fill,
282     channelz::ChildRefsList* ignored) {
283   MutexLock lock(&child_refs_mu_);
284   for (size_t i = 0; i < child_subchannels_.size(); ++i) {
285     // TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
286     // have to implement lightweight set. For now, we don't care about
287     // performance when channelz requests are made.
288     bool found = false;
289     for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) {
290       if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) {
291         found = true;
292         break;
293       }
294     }
295     if (!found) {
296       child_subchannels_to_fill->push_back(child_subchannels_[i]);
297     }
298   }
299 }
300
301 void RoundRobin::UpdateChildRefsLocked() {
302   channelz::ChildRefsList cs;
303   if (subchannel_list_ != nullptr) {
304     subchannel_list_->PopulateChildRefsList(&cs);
305   }
306   if (latest_pending_subchannel_list_ != nullptr) {
307     latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
308   }
309   // atomically update the data that channelz will actually be looking at.
310   MutexLock lock(&child_refs_mu_);
311   child_subchannels_ = std::move(cs);
312 }
313
314 void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
315   if (num_subchannels() == 0) return;
316   // Check current state of each subchannel synchronously, since any
317   // subchannel already used by some other channel may have a non-IDLE
318   // state.
319   for (size_t i = 0; i < num_subchannels(); ++i) {
320     grpc_error* error = GRPC_ERROR_NONE;
321     grpc_connectivity_state state =
322         subchannel(i)->CheckConnectivityStateLocked(&error);
323     if (state != GRPC_CHANNEL_IDLE) {
324       subchannel(i)->UpdateConnectivityStateLocked(state, error);
325     }
326   }
327   // Start connectivity watch for each subchannel.
328   for (size_t i = 0; i < num_subchannels(); i++) {
329     if (subchannel(i)->subchannel() != nullptr) {
330       subchannel(i)->StartConnectivityWatchLocked();
331     }
332   }
333   // Now set the LB policy's state based on the subchannels' states.
334   UpdateRoundRobinStateFromSubchannelStateCountsLocked();
335 }
336
337 void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked(
338     grpc_connectivity_state old_state, grpc_connectivity_state new_state,
339     grpc_error* transient_failure_error) {
340   GPR_ASSERT(old_state != GRPC_CHANNEL_SHUTDOWN);
341   GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
342   if (old_state == GRPC_CHANNEL_READY) {
343     GPR_ASSERT(num_ready_ > 0);
344     --num_ready_;
345   } else if (old_state == GRPC_CHANNEL_CONNECTING) {
346     GPR_ASSERT(num_connecting_ > 0);
347     --num_connecting_;
348   } else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
349     GPR_ASSERT(num_transient_failure_ > 0);
350     --num_transient_failure_;
351   }
352   if (new_state == GRPC_CHANNEL_READY) {
353     ++num_ready_;
354   } else if (new_state == GRPC_CHANNEL_CONNECTING) {
355     ++num_connecting_;
356   } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
357     ++num_transient_failure_;
358   }
359   GRPC_ERROR_UNREF(last_transient_failure_error_);
360   last_transient_failure_error_ = transient_failure_error;
361 }
362
363 // Sets the RR policy's connectivity state and generates a new picker based
364 // on the current subchannel list.
365 void RoundRobin::RoundRobinSubchannelList::
366     MaybeUpdateRoundRobinConnectivityStateLocked() {
367   RoundRobin* p = static_cast<RoundRobin*>(policy());
368   // Only set connectivity state if this is the current subchannel list.
369   if (p->subchannel_list_.get() != this) return;
370   /* In priority order. The first rule to match terminates the search (ie, if we
371    * are on rule n, all previous rules were unfulfilled).
372    *
373    * 1) RULE: ANY subchannel is READY => policy is READY.
374    *    CHECK: subchannel_list->num_ready > 0.
375    *
376    * 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING.
377    *    CHECK: sd->curr_connectivity_state == CONNECTING.
378    *
379    * 3) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
380    *                                                   TRANSIENT_FAILURE.
381    *    CHECK: subchannel_list->num_transient_failures ==
382    *           subchannel_list->num_subchannels.
383    */
384   if (num_ready_ > 0) {
385     /* 1) READY */
386     p->channel_control_helper()->UpdateState(
387         GRPC_CHANNEL_READY, GRPC_ERROR_NONE,
388         UniquePtr<SubchannelPicker>(New<Picker>(p, this)));
389   } else if (num_connecting_ > 0) {
390     /* 2) CONNECTING */
391     p->channel_control_helper()->UpdateState(
392         GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
393         UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref())));
394   } else if (num_transient_failure_ == num_subchannels()) {
395     /* 3) TRANSIENT_FAILURE */
396     p->channel_control_helper()->UpdateState(
397         GRPC_CHANNEL_TRANSIENT_FAILURE,
398         GRPC_ERROR_REF(last_transient_failure_error_),
399         UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(
400             GRPC_ERROR_REF(last_transient_failure_error_))));
401   }
402 }
403
404 void RoundRobin::RoundRobinSubchannelList::
405     UpdateRoundRobinStateFromSubchannelStateCountsLocked() {
406   RoundRobin* p = static_cast<RoundRobin*>(policy());
407   AutoChildRefsUpdater guard(p);
408   if (num_ready_ > 0) {
409     if (p->subchannel_list_.get() != this) {
410       // Promote this list to p->subchannel_list_.
411       // This list must be p->latest_pending_subchannel_list_, because
412       // any previous update would have been shut down already and
413       // therefore we would not be receiving a notification for them.
414       GPR_ASSERT(p->latest_pending_subchannel_list_.get() == this);
415       GPR_ASSERT(!shutting_down());
416       if (grpc_lb_round_robin_trace.enabled()) {
417         const size_t old_num_subchannels =
418             p->subchannel_list_ != nullptr
419                 ? p->subchannel_list_->num_subchannels()
420                 : 0;
421         gpr_log(GPR_INFO,
422                 "[RR %p] phasing out subchannel list %p (size %" PRIuPTR
423                 ") in favor of %p (size %" PRIuPTR ")",
424                 p, p->subchannel_list_.get(), old_num_subchannels, this,
425                 num_subchannels());
426       }
427       p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
428     }
429   }
430   // Update the RR policy's connectivity state if needed.
431   MaybeUpdateRoundRobinConnectivityStateLocked();
432 }
433
434 void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked(
435     grpc_connectivity_state connectivity_state, grpc_error* error) {
436   RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
437   if (grpc_lb_round_robin_trace.enabled()) {
438     gpr_log(
439         GPR_INFO,
440         "[RR %p] connectivity changed for subchannel %p, subchannel_list %p "
441         "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s",
442         p, subchannel(), subchannel_list(), Index(),
443         subchannel_list()->num_subchannels(),
444         grpc_connectivity_state_name(last_connectivity_state_),
445         grpc_connectivity_state_name(connectivity_state));
446   }
447   subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_,
448                                                connectivity_state, error);
449   last_connectivity_state_ = connectivity_state;
450 }
451
452 void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
453     grpc_connectivity_state connectivity_state, grpc_error* error) {
454   RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
455   GPR_ASSERT(subchannel() != nullptr);
456   // If the new state is TRANSIENT_FAILURE, re-resolve.
457   // Only do this if we've started watching, not at startup time.
458   // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
459   // when the subchannel list was created, we'd wind up in a constant
460   // loop of re-resolution.
461   if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
462     if (grpc_lb_round_robin_trace.enabled()) {
463       gpr_log(GPR_INFO,
464               "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
465               "Requesting re-resolution",
466               p, subchannel());
467     }
468     p->channel_control_helper()->RequestReresolution();
469   }
470   // Renew connectivity watch.
471   RenewConnectivityWatchLocked();
472   // Update state counters.
473   UpdateConnectivityStateLocked(connectivity_state, error);
474   // Update overall state and renew notification.
475   subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked();
476 }
477
478 void RoundRobin::UpdateLocked(UpdateArgs args) {
479   AutoChildRefsUpdater guard(this);
480   if (grpc_lb_round_robin_trace.enabled()) {
481     gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses",
482             this, args.addresses.size());
483   }
484   // Replace latest_pending_subchannel_list_.
485   if (latest_pending_subchannel_list_ != nullptr) {
486     if (grpc_lb_round_robin_trace.enabled()) {
487       gpr_log(GPR_INFO,
488               "[RR %p] Shutting down previous pending subchannel list %p", this,
489               latest_pending_subchannel_list_.get());
490     }
491   }
492   latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>(
493       this, &grpc_lb_round_robin_trace, args.addresses, combiner(), *args.args);
494   if (latest_pending_subchannel_list_->num_subchannels() == 0) {
495     // If the new list is empty, immediately promote the new list to the
496     // current list and transition to TRANSIENT_FAILURE.
497     grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update");
498     channel_control_helper()->UpdateState(
499         GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error),
500         UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error)));
501     subchannel_list_ = std::move(latest_pending_subchannel_list_);
502   } else if (subchannel_list_ == nullptr) {
503     // If there is no current list, immediately promote the new list to
504     // the current list and start watching it.
505     subchannel_list_ = std::move(latest_pending_subchannel_list_);
506     subchannel_list_->StartWatchingLocked();
507   } else {
508     // Start watching the pending list.  It will get swapped into the
509     // current list when it reports READY.
510     latest_pending_subchannel_list_->StartWatchingLocked();
511   }
512 }
513
514 //
515 // factory
516 //
517
518 class RoundRobinFactory : public LoadBalancingPolicyFactory {
519  public:
520   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
521       LoadBalancingPolicy::Args args) const override {
522     return OrphanablePtr<LoadBalancingPolicy>(New<RoundRobin>(std::move(args)));
523   }
524
525   const char* name() const override { return kRoundRobin; }
526 };
527
528 }  // namespace
529
530 }  // namespace grpc_core
531
532 void grpc_lb_policy_round_robin_init() {
533   grpc_core::LoadBalancingPolicyRegistry::Builder::
534       RegisterLoadBalancingPolicyFactory(
535           grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
536               grpc_core::New<grpc_core::RoundRobinFactory>()));
537 }
538
539 void grpc_lb_policy_round_robin_shutdown() {}