3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
23 #include <grpc/support/alloc.h>
25 #include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
26 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
27 #include "src/core/ext/filters/client_channel/server_address.h"
28 #include "src/core/ext/filters/client_channel/subchannel.h"
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/gprpp/sync.h"
31 #include "src/core/lib/iomgr/combiner.h"
32 #include "src/core/lib/iomgr/sockaddr_utils.h"
33 #include "src/core/lib/transport/connectivity_state.h"
37 TraceFlag grpc_lb_pick_first_trace(false, "pick_first");
42 // pick_first LB policy
45 constexpr char kPickFirst[] = "pick_first";
47 class PickFirst : public LoadBalancingPolicy {
49 explicit PickFirst(Args args);
51 const char* name() const override { return kPickFirst; }
53 void UpdateLocked(UpdateArgs args) override;
54 void ExitIdleLocked() override;
55 void ResetBackoffLocked() override;
56 void FillChildRefsForChannelz(channelz::ChildRefsList* child_subchannels,
57 channelz::ChildRefsList* ignored) override;
62 class PickFirstSubchannelList;
64 class PickFirstSubchannelData
65 : public SubchannelData<PickFirstSubchannelList,
66 PickFirstSubchannelData> {
68 PickFirstSubchannelData(
69 SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>*
71 const ServerAddress& address, Subchannel* subchannel,
72 grpc_combiner* combiner)
73 : SubchannelData(subchannel_list, address, subchannel, combiner) {}
75 void ProcessConnectivityChangeLocked(
76 grpc_connectivity_state connectivity_state) override;
78 // Processes the connectivity change to READY for an unselected subchannel.
79 void ProcessUnselectedReadyLocked();
81 void CheckConnectivityStateAndStartWatchingLocked();
84 class PickFirstSubchannelList
85 : public SubchannelList<PickFirstSubchannelList,
86 PickFirstSubchannelData> {
88 PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer,
89 const ServerAddressList& addresses,
90 grpc_combiner* combiner,
91 const grpc_channel_args& args)
92 : SubchannelList(policy, tracer, addresses, combiner,
93 policy->channel_control_helper(), args) {
94 // Need to maintain a ref to the LB policy as long as we maintain
95 // any references to subchannels, since the subchannels'
96 // pollset_sets will include the LB policy's pollset_set.
97 policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
100 ~PickFirstSubchannelList() {
101 PickFirst* p = static_cast<PickFirst*>(policy());
102 p->Unref(DEBUG_LOCATION, "subchannel_list");
105 bool in_transient_failure() const { return in_transient_failure_; }
106 void set_in_transient_failure(bool in_transient_failure) {
107 in_transient_failure_ = in_transient_failure;
111 bool in_transient_failure_ = false;
114 class Picker : public SubchannelPicker {
116 explicit Picker(RefCountedPtr<ConnectedSubchannel> connected_subchannel)
117 : connected_subchannel_(std::move(connected_subchannel)) {}
119 PickResult Pick(PickArgs* pick, grpc_error** error) override {
120 pick->connected_subchannel = connected_subchannel_;
121 return PICK_COMPLETE;
125 RefCountedPtr<ConnectedSubchannel> connected_subchannel_;
128 // Helper class to ensure that any function that modifies the child refs
129 // data structures will update the channelz snapshot data structures before
131 class AutoChildRefsUpdater {
133 explicit AutoChildRefsUpdater(PickFirst* pf) : pf_(pf) {}
134 ~AutoChildRefsUpdater() { pf_->UpdateChildRefsLocked(); }
140 void ShutdownLocked() override;
142 void UpdateChildRefsLocked();
144 // All our subchannels.
145 OrphanablePtr<PickFirstSubchannelList> subchannel_list_;
146 // Latest pending subchannel list.
147 OrphanablePtr<PickFirstSubchannelList> latest_pending_subchannel_list_;
148 // Selected subchannel in \a subchannel_list_.
149 PickFirstSubchannelData* selected_ = nullptr;
150 // Are we in IDLE state?
153 bool shutdown_ = false;
155 /// Lock and data used to capture snapshots of this channels child
156 /// channels and subchannels. This data is consumed by channelz.
157 Mutex child_refs_mu_;
158 channelz::ChildRefsList child_subchannels_;
159 channelz::ChildRefsList child_channels_;
162 PickFirst::PickFirst(Args args) : LoadBalancingPolicy(std::move(args)) {
163 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
164 gpr_log(GPR_INFO, "Pick First %p created.", this);
168 PickFirst::~PickFirst() {
169 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
170 gpr_log(GPR_INFO, "Destroying Pick First %p", this);
172 GPR_ASSERT(subchannel_list_ == nullptr);
173 GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
176 void PickFirst::ShutdownLocked() {
177 AutoChildRefsUpdater guard(this);
178 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
179 gpr_log(GPR_INFO, "Pick First %p Shutting down", this);
182 subchannel_list_.reset();
183 latest_pending_subchannel_list_.reset();
186 void PickFirst::ExitIdleLocked() {
187 if (shutdown_) return;
190 if (subchannel_list_ == nullptr ||
191 subchannel_list_->num_subchannels() == 0) {
192 grpc_error* error = grpc_error_set_int(
193 GRPC_ERROR_CREATE_FROM_STATIC_STRING("No addresses to connect to"),
194 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
195 channel_control_helper()->UpdateState(
196 GRPC_CHANNEL_TRANSIENT_FAILURE,
197 UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error)));
199 subchannel_list_->subchannel(0)
200 ->CheckConnectivityStateAndStartWatchingLocked();
205 void PickFirst::ResetBackoffLocked() {
206 subchannel_list_->ResetBackoffLocked();
207 if (latest_pending_subchannel_list_ != nullptr) {
208 latest_pending_subchannel_list_->ResetBackoffLocked();
212 void PickFirst::FillChildRefsForChannelz(
213 channelz::ChildRefsList* child_subchannels_to_fill,
214 channelz::ChildRefsList* ignored) {
215 MutexLock lock(&child_refs_mu_);
216 for (size_t i = 0; i < child_subchannels_.size(); ++i) {
217 // TODO(ncteisen): implement a de dup loop that is not O(n^2). Might
218 // have to implement lightweight set. For now, we don't care about
219 // performance when channelz requests are made.
221 for (size_t j = 0; j < child_subchannels_to_fill->size(); ++j) {
222 if ((*child_subchannels_to_fill)[j] == child_subchannels_[i]) {
228 child_subchannels_to_fill->push_back(child_subchannels_[i]);
233 void PickFirst::UpdateChildRefsLocked() {
234 channelz::ChildRefsList cs;
235 if (subchannel_list_ != nullptr) {
236 subchannel_list_->PopulateChildRefsList(&cs);
238 if (latest_pending_subchannel_list_ != nullptr) {
239 latest_pending_subchannel_list_->PopulateChildRefsList(&cs);
241 // atomically update the data that channelz will actually be looking at.
242 MutexLock lock(&child_refs_mu_);
243 child_subchannels_ = std::move(cs);
246 void PickFirst::UpdateLocked(UpdateArgs args) {
247 AutoChildRefsUpdater guard(this);
248 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
250 "Pick First %p received update with %" PRIuPTR " addresses", this,
251 args.addresses.size());
253 grpc_arg new_arg = grpc_channel_arg_integer_create(
254 const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1);
255 grpc_channel_args* new_args =
256 grpc_channel_args_copy_and_add(args.args, &new_arg, 1);
257 auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>(
258 this, &grpc_lb_pick_first_trace, args.addresses, combiner(), *new_args);
259 grpc_channel_args_destroy(new_args);
260 if (subchannel_list->num_subchannels() == 0) {
261 // Empty update or no valid subchannels. Unsubscribe from all current
263 subchannel_list_ = std::move(subchannel_list); // Empty list.
265 // If not idle, put the channel in TRANSIENT_FAILURE.
266 // (If we are idle, then this will happen in ExitIdleLocked() if we
267 // haven't gotten a non-empty update by the time the application tries
268 // to start a new call.)
270 grpc_error* error = grpc_error_set_int(
271 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
272 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
273 channel_control_helper()->UpdateState(
274 GRPC_CHANNEL_TRANSIENT_FAILURE,
275 UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error)));
279 // If one of the subchannels in the new list is already in state
280 // READY, then select it immediately. This can happen when the
281 // currently selected subchannel is also present in the update. It
282 // can also happen if one of the subchannels in the update is already
283 // in the global subchannel pool because it's in use by another channel.
284 // TODO(roth): If we're in IDLE state, we should probably defer this
285 // check and instead do it in ExitIdleLocked().
286 for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
287 PickFirstSubchannelData* sd = subchannel_list->subchannel(i);
288 grpc_connectivity_state state = sd->CheckConnectivityStateLocked();
289 if (state == GRPC_CHANNEL_READY) {
290 subchannel_list_ = std::move(subchannel_list);
291 sd->StartConnectivityWatchLocked();
292 sd->ProcessUnselectedReadyLocked();
293 // If there was a previously pending update (which may or may
294 // not have contained the currently selected subchannel), drop
295 // it, so that it doesn't override what we've done here.
296 latest_pending_subchannel_list_.reset();
297 // Make sure that subsequent calls to ExitIdleLocked() don't cause
298 // us to start watching a subchannel other than the one we've
304 if (selected_ == nullptr) {
305 // We don't yet have a selected subchannel, so replace the current
306 // subchannel list immediately.
307 subchannel_list_ = std::move(subchannel_list);
308 // If we're not in IDLE state, start trying to connect to the first
309 // subchannel in the new list.
311 // Note: No need to use CheckConnectivityStateAndStartWatchingLocked()
312 // here, since we've already checked the initial connectivity
313 // state of all subchannels above.
314 subchannel_list_->subchannel(0)->StartConnectivityWatchLocked();
317 // We do have a selected subchannel (which means it's READY), so keep
318 // using it until one of the subchannels in the new list reports READY.
319 if (latest_pending_subchannel_list_ != nullptr) {
320 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
322 "Pick First %p Shutting down latest pending subchannel list "
323 "%p, about to be replaced by newer latest %p",
324 this, latest_pending_subchannel_list_.get(),
325 subchannel_list.get());
328 latest_pending_subchannel_list_ = std::move(subchannel_list);
329 // If we're not in IDLE state, start trying to connect to the first
330 // subchannel in the new list.
332 // Note: No need to use CheckConnectivityStateAndStartWatchingLocked()
333 // here, since we've already checked the initial connectivity
334 // state of all subchannels above.
335 latest_pending_subchannel_list_->subchannel(0)
336 ->StartConnectivityWatchLocked();
341 void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
342 grpc_connectivity_state connectivity_state) {
343 PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
344 AutoChildRefsUpdater guard(p);
345 // The notification must be for a subchannel in either the current or
346 // latest pending subchannel lists.
347 GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
348 subchannel_list() == p->latest_pending_subchannel_list_.get());
349 GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN);
350 // Handle updates for the currently selected subchannel.
351 if (p->selected_ == this) {
352 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
354 "Pick First %p selected subchannel connectivity changed to %s", p,
355 grpc_connectivity_state_name(connectivity_state));
357 // If the new state is anything other than READY and there is a
358 // pending update, switch to the pending update.
359 if (connectivity_state != GRPC_CHANNEL_READY &&
360 p->latest_pending_subchannel_list_ != nullptr) {
361 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
363 "Pick First %p promoting pending subchannel list %p to "
365 p, p->latest_pending_subchannel_list_.get(),
366 p->subchannel_list_.get());
368 p->selected_ = nullptr;
369 StopConnectivityWatchLocked();
370 p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
371 // Set our state to that of the pending subchannel list.
372 if (p->subchannel_list_->in_transient_failure()) {
373 grpc_error* error = grpc_error_set_int(
374 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
375 "selected subchannel failed; switching to pending update"),
376 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
377 p->channel_control_helper()->UpdateState(
378 GRPC_CHANNEL_TRANSIENT_FAILURE,
379 UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error)));
381 p->channel_control_helper()->UpdateState(
382 GRPC_CHANNEL_CONNECTING,
383 UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref())));
386 if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
387 // If the selected subchannel goes bad, request a re-resolution. We
388 // also set the channel state to IDLE. The reason is that if the new
389 // state is TRANSIENT_FAILURE due to a GOAWAY reception we don't want
390 // to connect to the re-resolved backends until we leave IDLE state.
392 p->channel_control_helper()->RequestReresolution();
393 p->selected_ = nullptr;
394 StopConnectivityWatchLocked();
395 p->channel_control_helper()->UpdateState(
397 UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref())));
399 // This is unlikely but can happen when a subchannel has been asked
400 // to reconnect by a different channel and this channel has dropped
401 // some connectivity state notifications.
402 if (connectivity_state == GRPC_CHANNEL_READY) {
403 p->channel_control_helper()->UpdateState(
404 GRPC_CHANNEL_READY, UniquePtr<SubchannelPicker>(New<Picker>(
405 connected_subchannel()->Ref())));
406 } else { // CONNECTING
407 p->channel_control_helper()->UpdateState(
409 UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref())));
411 // Renew notification.
412 RenewConnectivityWatchLocked();
417 // If we get here, there are two possible cases:
418 // 1. We do not currently have a selected subchannel, and the update is
419 // for a subchannel in p->subchannel_list_ that we're trying to
420 // connect to. The goal here is to find a subchannel that we can
422 // 2. We do currently have a selected subchannel, and the update is
423 // for a subchannel in p->latest_pending_subchannel_list_. The
424 // goal here is to find a subchannel from the update that we can
425 // select in place of the current one.
426 subchannel_list()->set_in_transient_failure(false);
427 switch (connectivity_state) {
428 case GRPC_CHANNEL_READY: {
429 // Renew notification.
430 RenewConnectivityWatchLocked();
431 ProcessUnselectedReadyLocked();
434 case GRPC_CHANNEL_TRANSIENT_FAILURE: {
435 StopConnectivityWatchLocked();
436 PickFirstSubchannelData* sd = this;
438 (sd->Index() + 1) % subchannel_list()->num_subchannels();
439 sd = subchannel_list()->subchannel(next_index);
440 // If we're tried all subchannels, set state to TRANSIENT_FAILURE.
441 if (sd->Index() == 0) {
442 // Re-resolve if this is the most recent subchannel list.
443 if (subchannel_list() == (p->latest_pending_subchannel_list_ != nullptr
444 ? p->latest_pending_subchannel_list_.get()
445 : p->subchannel_list_.get())) {
446 p->channel_control_helper()->RequestReresolution();
448 subchannel_list()->set_in_transient_failure(true);
449 // Only report new state in case 1.
450 if (subchannel_list() == p->subchannel_list_.get()) {
451 grpc_error* error = grpc_error_set_int(
452 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
453 "failed to connect to all addresses"),
454 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
455 p->channel_control_helper()->UpdateState(
456 GRPC_CHANNEL_TRANSIENT_FAILURE,
457 UniquePtr<SubchannelPicker>(New<TransientFailurePicker>(error)));
460 sd->CheckConnectivityStateAndStartWatchingLocked();
463 case GRPC_CHANNEL_CONNECTING:
464 case GRPC_CHANNEL_IDLE: {
465 // Only update connectivity state in case 1.
466 if (subchannel_list() == p->subchannel_list_.get()) {
467 p->channel_control_helper()->UpdateState(
468 GRPC_CHANNEL_CONNECTING,
469 UniquePtr<SubchannelPicker>(New<QueuePicker>(p->Ref())));
471 // Renew notification.
472 RenewConnectivityWatchLocked();
475 case GRPC_CHANNEL_SHUTDOWN:
476 GPR_UNREACHABLE_CODE(break);
480 void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
481 PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
482 // If we get here, there are two possible cases:
483 // 1. We do not currently have a selected subchannel, and the update is
484 // for a subchannel in p->subchannel_list_ that we're trying to
485 // connect to. The goal here is to find a subchannel that we can
487 // 2. We do currently have a selected subchannel, and the update is
488 // for a subchannel in p->latest_pending_subchannel_list_. The
489 // goal here is to find a subchannel from the update that we can
490 // select in place of the current one.
491 GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
492 subchannel_list() == p->latest_pending_subchannel_list_.get());
493 // Case 2. Promote p->latest_pending_subchannel_list_ to p->subchannel_list_.
494 if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
495 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
497 "Pick First %p promoting pending subchannel list %p to "
499 p, p->latest_pending_subchannel_list_.get(),
500 p->subchannel_list_.get());
502 p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
506 p->channel_control_helper()->UpdateState(
508 UniquePtr<SubchannelPicker>(New<Picker>(connected_subchannel()->Ref())));
509 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
510 gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel());
514 void PickFirst::PickFirstSubchannelData::
515 CheckConnectivityStateAndStartWatchingLocked() {
516 PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
517 // Check current state.
518 grpc_connectivity_state current_state = CheckConnectivityStateLocked();
520 StartConnectivityWatchLocked();
521 // If current state is READY, select the subchannel now, since we started
522 // watching from this state and will not get a notification of it
523 // transitioning into this state.
524 if (p->selected_ != this && current_state == GRPC_CHANNEL_READY) {
525 ProcessUnselectedReadyLocked();
529 class ParsedPickFirstConfig : public ParsedLoadBalancingConfig {
531 const char* name() const override { return kPickFirst; }
538 class PickFirstFactory : public LoadBalancingPolicyFactory {
540 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
541 LoadBalancingPolicy::Args args) const override {
542 return OrphanablePtr<LoadBalancingPolicy>(New<PickFirst>(std::move(args)));
545 const char* name() const override { return kPickFirst; }
547 RefCountedPtr<ParsedLoadBalancingConfig> ParseLoadBalancingConfig(
548 const grpc_json* json, grpc_error** error) const override {
549 if (json != nullptr) {
550 GPR_DEBUG_ASSERT(strcmp(json->key, name()) == 0);
552 return RefCountedPtr<ParsedLoadBalancingConfig>(
553 New<ParsedPickFirstConfig>());
559 } // namespace grpc_core
561 void grpc_lb_policy_pick_first_init() {
562 grpc_core::LoadBalancingPolicyRegistry::Builder::
563 RegisterLoadBalancingPolicyFactory(
564 grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
565 grpc_core::New<grpc_core::PickFirstFactory>()));
568 void grpc_lb_policy_pick_first_shutdown() {}