3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 /** Round Robin Policy.
21 * Before every pick, the \a get_next_ready_subchannel_index_locked function
22 * returns the p->subchannel_list->subchannels index for next subchannel,
23 * respecting the relative order of the addresses provided upon creation or
24 * updates. Note however that updates will start picking from the beginning of
25 * the updated list. */
27 #include <grpc/support/port_platform.h>
32 #include <grpc/support/alloc.h>
34 #include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
35 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
36 #include "src/core/ext/filters/client_channel/subchannel.h"
37 #include "src/core/lib/channel/channel_args.h"
38 #include "src/core/lib/debug/trace.h"
39 #include "src/core/lib/gprpp/mutex_lock.h"
40 #include "src/core/lib/gprpp/ref_counted_ptr.h"
41 #include "src/core/lib/iomgr/combiner.h"
42 #include "src/core/lib/iomgr/sockaddr_utils.h"
43 #include "src/core/lib/transport/connectivity_state.h"
44 #include "src/core/lib/transport/static_metadata.h"
48 TraceFlag grpc_lb_round_robin_trace(false, "round_robin");
53 // round_robin LB policy
56 constexpr char kRoundRobin[] = "round_robin";
58 class RoundRobin : public LoadBalancingPolicy {
60 explicit RoundRobin(Args args);
62 const char* name() const override { return kRoundRobin; }
64 void UpdateLocked(UpdateArgs args) override;
65 void ResetBackoffLocked() override;
66 void FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
67 channelz::ChildRefsList* ignored) override;
72 // Forward declaration.
73 class RoundRobinSubchannelList;
75 // Data for a particular subchannel in a subchannel list.
76 // This subclass adds the following functionality:
77 // - Tracks the previous connectivity state of the subchannel, so that
78 // we know how many subchannels are in each state.
79 class RoundRobinSubchannelData
80 : public SubchannelData<RoundRobinSubchannelList,
81 RoundRobinSubchannelData> {
83 RoundRobinSubchannelData(
84 SubchannelList<RoundRobinSubchannelList, RoundRobinSubchannelData>*
86 const ServerAddress& address, Subchannel* subchannel,
87 grpc_combiner* combiner)
88 : SubchannelData(subchannel_list, address, subchannel, combiner) {}
90 grpc_connectivity_state connectivity_state() const {
91 return last_connectivity_state_;
94 void UpdateConnectivityStateLocked(
95 grpc_connectivity_state connectivity_state, grpc_error* error);
98 void ProcessConnectivityChangeLocked(
99 grpc_connectivity_state connectivity_state, grpc_error* error) override;
101 grpc_connectivity_state last_connectivity_state_ = GRPC_CHANNEL_IDLE;
104 // A list of subchannels.
105 class RoundRobinSubchannelList
106 : public SubchannelList<RoundRobinSubchannelList,
107 RoundRobinSubchannelData> {
109 RoundRobinSubchannelList(RoundRobin* policy, TraceFlag* tracer,
110 const ServerAddressList& addresses,
111 grpc_combiner* combiner,
112 const grpc_channel_args& args)
113 : SubchannelList(policy, tracer, addresses, combiner,
114 policy->channel_control_helper(), args) {
115 // Need to maintain a ref to the LB policy as long as we maintain
116 // any references to subchannels, since the subchannels'
117 // pollset_sets will include the LB policy's pollset_set.
118 policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
121 ~RoundRobinSubchannelList() {
122 GRPC_ERROR_UNREF(last_transient_failure_error_);
123 RoundRobin* p = static_cast<RoundRobin*>(policy());
124 p->Unref(DEBUG_LOCATION, "subchannel_list");
127 // Starts watching the subchannels in this list.
128 void StartWatchingLocked();
130 // Updates the counters of subchannels in each state when a
131 // subchannel transitions from old_state to new_state.
132 // transient_failure_error is the error that is reported when
133 // new_state is TRANSIENT_FAILURE.
134 void UpdateStateCountersLocked(grpc_connectivity_state old_state,
135 grpc_connectivity_state new_state,
136 grpc_error* transient_failure_error);
138 // If this subchannel list is the RR policy's current subchannel
139 // list, updates the RR policy's connectivity state based on the
140 // subchannel list's state counters.
141 void MaybeUpdateRoundRobinConnectivityStateLocked();
143 // Updates the RR policy's overall state based on the counters of
144 // subchannels in each state.
145 void UpdateRoundRobinStateFromSubchannelStateCountsLocked();
148 size_t num_ready_ = 0;
149 size_t num_connecting_ = 0;
150 size_t num_transient_failure_ = 0;
151 grpc_error* last_transient_failure_error_ = GRPC_ERROR_NONE;
154 class Picker : public SubchannelPicker {
156 Picker(RoundRobin* parent, RoundRobinSubchannelList* subchannel_list);
158 PickResult Pick(PickArgs* pick, grpc_error** error) override;
161 // Using pointer value only, no ref held -- do not dereference!
164 size_t last_picked_index_;
165 InlinedVector<RefCountedPtr<ConnectedSubchannel>, 10> subchannels_;
168 // Helper class to ensure that any function that modifies the child refs
169 // data structures will update the channelz snapshot data structures before
171 class AutoChildRefsUpdater {
173 explicit AutoChildRefsUpdater(RoundRobin* rr) : rr_(rr) {}
174 ~AutoChildRefsUpdater() { rr_->UpdateChildRefsLocked(); }
180 void ShutdownLocked() override;
182 void UpdateChildRefsLocked();
184 /** list of subchannels */
185 OrphanablePtr<RoundRobinSubchannelList> subchannel_list_;
186 /** Latest version of the subchannel list.
187 * Subchannel connectivity callbacks will only promote updated subchannel
188 * lists if they equal \a latest_pending_subchannel_list. In other words,
189 * racing callbacks that reference outdated subchannel lists won't perform any
191 OrphanablePtr<RoundRobinSubchannelList> latest_pending_subchannel_list_;
192 /** are we shutting down? */
193 bool shutdown_ = false;
194 /// Lock and data used to capture snapshots of this channel's child
195 /// channels and subchannels. This data is consumed by channelz.
196 gpr_mu child_refs_mu_;
197 channelz::ChildRefsList child_subchannels_;
198 channelz::ChildRefsList child_channels_;
202 // RoundRobin::Picker
205 RoundRobin::Picker::Picker(RoundRobin* parent,
206 RoundRobinSubchannelList* subchannel_list)
208 for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
209 auto* connected_subchannel =
210 subchannel_list->subchannel(i)->connected_subchannel();
211 if (connected_subchannel != nullptr) {
212 subchannels_.push_back(connected_subchannel->Ref());
215 // For discussion on why we generate a random starting index for
216 // the picker, see https://github.com/grpc/grpc-go/issues/2580.
217 // TODO(roth): rand(3) is not thread-safe. This should be replaced with
218 // something better as part of https://github.com/grpc/grpc/issues/17891.
219 last_picked_index_ = rand() % subchannels_.size();
220 if (grpc_lb_round_robin_trace.enabled()) {
222 "[RR %p picker %p] created picker from subchannel_list=%p "
223 "with %" PRIuPTR " READY subchannels; last_picked_index_=%" PRIuPTR,
224 parent_, this, subchannel_list, subchannels_.size(),
229 RoundRobin::PickResult RoundRobin::Picker::Pick(PickArgs* pick,
230 grpc_error** error) {
231 last_picked_index_ = (last_picked_index_ + 1) % subchannels_.size();
232 if (grpc_lb_round_robin_trace.enabled()) {
234 "[RR %p picker %p] returning index %" PRIuPTR
235 ", connected_subchannel=%p",
236 parent_, this, last_picked_index_,
237 subchannels_[last_picked_index_].get());
239 pick->connected_subchannel = subchannels_[last_picked_index_];
240 return PICK_COMPLETE;
247 RoundRobin::RoundRobin(Args args) : LoadBalancingPolicy(std::move(args)) {
248 gpr_mu_init(&child_refs_mu_);
249 if (grpc_lb_round_robin_trace.enabled()) {
250 gpr_log(GPR_INFO, "[RR %p] Created", this);
254 RoundRobin::~RoundRobin() {
255 if (grpc_lb_round_robin_trace.enabled()) {
256 gpr_log(GPR_INFO, "[RR %p] Destroying Round Robin policy", this);
258 gpr_mu_destroy(&child_refs_mu_);
259 GPR_ASSERT(subchannel_list_ == nullptr);
260 GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
263 void RoundRobin::ShutdownLocked() {
264 AutoChildRefsUpdater guard(this);
265 if (grpc_lb_round_robin_trace.enabled()) {
266 gpr_log(GPR_INFO, "[RR %p] Shutting down", this);
269 subchannel_list_.reset();
270 latest_pending_subchannel_list_.reset();
273 void RoundRobin::ResetBackoffLocked() {
274 subchannel_list_->ResetBackoffLocked();
275 if (latest_pending_subchannel_list_ != nullptr) {
276 latest_pending_subchannel_list_->ResetBackoffLocked();
280 void RoundRobin::FillChildRefsForChannelz(
281 channelz::ChildRefsList* child_subchannels_to_fill,
282 channelz::ChildRefsList* ignored) {
283 MutexLock lock(&child_refs_mu_);
284 for (size_t i = 0; i < child_subchannels_.size(); ++i) {
285 // TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
286 // have to implement lightweight set. For now, we don't care about
287 // performance when channelz requests are made.
289 for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) {
290 if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) {
296 child_subchannels_to_fill->push_back(child_subchannels_[i]);
301 void RoundRobin::UpdateChildRefsLocked() {
302 channelz::ChildRefsList cs;
303 if (subchannel_list_ != nullptr) {
304 subchannel_list_->PopulateChildRefsList(&cs);
306 if (latest_pending_subchannel_list_ != nullptr) {
307 latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
309 // atomically update the data that channelz will actually be looking at.
310 MutexLock lock(&child_refs_mu_);
311 child_subchannels_ = std::move(cs);
314 void RoundRobin::RoundRobinSubchannelList::StartWatchingLocked() {
315 if (num_subchannels() == 0) return;
316 // Check current state of each subchannel synchronously, since any
317 // subchannel already used by some other channel may have a non-IDLE
319 for (size_t i = 0; i < num_subchannels(); ++i) {
320 grpc_error* error = GRPC_ERROR_NONE;
321 grpc_connectivity_state state =
322 subchannel(i)->CheckConnectivityStateLocked(&error);
323 if (state != GRPC_CHANNEL_IDLE) {
324 subchannel(i)->UpdateConnectivityStateLocked(state, error);
327 // Start connectivity watch for each subchannel.
328 for (size_t i = 0; i < num_subchannels(); i++) {
329 if (subchannel(i)->subchannel() != nullptr) {
330 subchannel(i)->StartConnectivityWatchLocked();
333 // Now set the LB policy's state based on the subchannels' states.
334 UpdateRoundRobinStateFromSubchannelStateCountsLocked();
337 void RoundRobin::RoundRobinSubchannelList::UpdateStateCountersLocked(
338 grpc_connectivity_state old_state, grpc_connectivity_state new_state,
339 grpc_error* transient_failure_error) {
340 GPR_ASSERT(old_state != GRPC_CHANNEL_SHUTDOWN);
341 GPR_ASSERT(new_state != GRPC_CHANNEL_SHUTDOWN);
342 if (old_state == GRPC_CHANNEL_READY) {
343 GPR_ASSERT(num_ready_ > 0);
345 } else if (old_state == GRPC_CHANNEL_CONNECTING) {
346 GPR_ASSERT(num_connecting_ > 0);
348 } else if (old_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
349 GPR_ASSERT(num_transient_failure_ > 0);
350 --num_transient_failure_;
352 if (new_state == GRPC_CHANNEL_READY) {
354 } else if (new_state == GRPC_CHANNEL_CONNECTING) {
356 } else if (new_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
357 ++num_transient_failure_;
359 GRPC_ERROR_UNREF(last_transient_failure_error_);
360 last_transient_failure_error_ = transient_failure_error;
363 // Sets the RR policy's connectivity state and generates a new picker based
364 // on the current subchannel list.
365 void RoundRobin::RoundRobinSubchannelList::
366 MaybeUpdateRoundRobinConnectivityStateLocked() {
367 RoundRobin* p = static_cast<RoundRobin*>(policy());
368 // Only set connectivity state if this is the current subchannel list.
369 if (p->subchannel_list_.get() != this) return;
370 /* In priority order. The first rule to match terminates the search (ie, if we
371 * are on rule n, all previous rules were unfulfilled).
373 * 1) RULE: ANY subchannel is READY => policy is READY.
374 * CHECK: subchannel_list->num_ready > 0.
376 * 2) RULE: ANY subchannel is CONNECTING => policy is CONNECTING.
377 * CHECK: sd->curr_connectivity_state == CONNECTING.
379 * 3) RULE: ALL subchannels are TRANSIENT_FAILURE => policy is
381 * CHECK: subchannel_list->num_transient_failures ==
382 * subchannel_list->num_subchannels.
384 if (num_ready_ > 0) {
386 p->channel_control_helper()->UpdateState(
387 GRPC_CHANNEL_READY, GRPC_ERROR_NONE,
388 UniquePtr<SubchannelPicker>(New<Picker>(p, this)));
389 } else if (num_connecting_ > 0) {
391 p->channel_control_helper()->UpdateState(
392 GRPC_CHANNEL_CONNECTING, GRPC_ERROR_NONE,
393 UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref())));
394 } else if (num_transient_failure_ == num_subchannels()) {
395 /* 3) TRANSIENT_FAILURE */
396 p->channel_control_helper()->UpdateState(
397 GRPC_CHANNEL_TRANSIENT_FAILURE,
398 GRPC_ERROR_REF(last_transient_failure_error_),
399 UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(
400 GRPC_ERROR_REF(last_transient_failure_error_))));
404 void RoundRobin::RoundRobinSubchannelList::
405 UpdateRoundRobinStateFromSubchannelStateCountsLocked() {
406 RoundRobin* p = static_cast<RoundRobin*>(policy());
407 AutoChildRefsUpdater guard(p);
408 if (num_ready_ > 0) {
409 if (p->subchannel_list_.get() != this) {
410 // Promote this list to p->subchannel_list_.
411 // This list must be p->latest_pending_subchannel_list_, because
412 // any previous update would have been shut down already and
413 // therefore we would not be receiving a notification for them.
414 GPR_ASSERT(p->latest_pending_subchannel_list_.get() == this);
415 GPR_ASSERT(!shutting_down());
416 if (grpc_lb_round_robin_trace.enabled()) {
417 const size_t old_num_subchannels =
418 p->subchannel_list_ != nullptr
419 ? p->subchannel_list_->num_subchannels()
422 "[RR %p] phasing out subchannel list %p (size %" PRIuPTR
423 ") in favor of %p (size %" PRIuPTR ")",
424 p, p->subchannel_list_.get(), old_num_subchannels, this,
427 p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
430 // Update the RR policy's connectivity state if needed.
431 MaybeUpdateRoundRobinConnectivityStateLocked();
434 void RoundRobin::RoundRobinSubchannelData::UpdateConnectivityStateLocked(
435 grpc_connectivity_state connectivity_state, grpc_error* error) {
436 RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
437 if (grpc_lb_round_robin_trace.enabled()) {
440 "[RR %p] connectivity changed for subchannel %p, subchannel_list %p "
441 "(index %" PRIuPTR " of %" PRIuPTR "): prev_state=%s new_state=%s",
442 p, subchannel(), subchannel_list(), Index(),
443 subchannel_list()->num_subchannels(),
444 grpc_connectivity_state_name(last_connectivity_state_),
445 grpc_connectivity_state_name(connectivity_state));
447 subchannel_list()->UpdateStateCountersLocked(last_connectivity_state_,
448 connectivity_state, error);
449 last_connectivity_state_ = connectivity_state;
452 void RoundRobin::RoundRobinSubchannelData::ProcessConnectivityChangeLocked(
453 grpc_connectivity_state connectivity_state, grpc_error* error) {
454 RoundRobin* p = static_cast<RoundRobin*>(subchannel_list()->policy());
455 GPR_ASSERT(subchannel() != nullptr);
456 // If the new state is TRANSIENT_FAILURE, re-resolve.
457 // Only do this if we've started watching, not at startup time.
458 // Otherwise, if the subchannel was already in state TRANSIENT_FAILURE
459 // when the subchannel list was created, we'd wind up in a constant
460 // loop of re-resolution.
461 if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
462 if (grpc_lb_round_robin_trace.enabled()) {
464 "[RR %p] Subchannel %p has gone into TRANSIENT_FAILURE. "
465 "Requesting re-resolution",
468 p->channel_control_helper()->RequestReresolution();
470 // Renew connectivity watch.
471 RenewConnectivityWatchLocked();
472 // Update state counters.
473 UpdateConnectivityStateLocked(connectivity_state, error);
474 // Update overall state and renew notification.
475 subchannel_list()->UpdateRoundRobinStateFromSubchannelStateCountsLocked();
478 void RoundRobin::UpdateLocked(UpdateArgs args) {
479 AutoChildRefsUpdater guard(this);
480 if (grpc_lb_round_robin_trace.enabled()) {
481 gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses",
482 this, args.addresses.size());
484 // Replace latest_pending_subchannel_list_.
485 if (latest_pending_subchannel_list_ != nullptr) {
486 if (grpc_lb_round_robin_trace.enabled()) {
488 "[RR %p] Shutting down previous pending subchannel list %p", this,
489 latest_pending_subchannel_list_.get());
492 latest_pending_subchannel_list_ = MakeOrphanable<RoundRobinSubchannelList>(
493 this, &grpc_lb_round_robin_trace, args.addresses, combiner(), *args.args);
494 if (latest_pending_subchannel_list_->num_subchannels() == 0) {
495 // If the new list is empty, immediately promote the new list to the
496 // current list and transition to TRANSIENT_FAILURE.
497 grpc_error* error = GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update");
498 channel_control_helper()->UpdateState(
499 GRPC_CHANNEL_TRANSIENT_FAILURE, GRPC_ERROR_REF(error),
500 UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error)));
501 subchannel_list_ = std::move(latest_pending_subchannel_list_);
502 } else if (subchannel_list_ == nullptr) {
503 // If there is no current list, immediately promote the new list to
504 // the current list and start watching it.
505 subchannel_list_ = std::move(latest_pending_subchannel_list_);
506 subchannel_list_->StartWatchingLocked();
508 // Start watching the pending list. It will get swapped into the
509 // current list when it reports READY.
510 latest_pending_subchannel_list_->StartWatchingLocked();
518 class RoundRobinFactory : public LoadBalancingPolicyFactory {
520 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
521 LoadBalancingPolicy::Args args) const override {
522 return OrphanablePtr<LoadBalancingPolicy>(New<RoundRobin>(std::move(args)));
525 const char* name() const override { return kRoundRobin; }
530 } // namespace grpc_core
532 void grpc_lb_policy_round_robin_init() {
533 grpc_core::LoadBalancingPolicyRegistry::Builder::
534 RegisterLoadBalancingPolicyFactory(
535 grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
536 grpc_core::New<grpc_core::RoundRobinFactory>()));
539 void grpc_lb_policy_round_robin_shutdown() {}