3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
23 #include <grpc/support/alloc.h>
25 #include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
26 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
27 #include "src/core/ext/filters/client_channel/server_address.h"
28 #include "src/core/ext/filters/client_channel/subchannel.h"
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/gprpp/sync.h"
31 #include "src/core/lib/iomgr/sockaddr_utils.h"
32 #include "src/core/lib/transport/connectivity_state.h"
33 #include "src/core/lib/transport/error_utils.h"
37 TraceFlag grpc_lb_pick_first_trace(false, "pick_first");
42 // pick_first LB policy
45 constexpr char kPickFirst[] = "pick_first";
47 class PickFirst : public LoadBalancingPolicy {
49 explicit PickFirst(Args args);
51 const char* name() const override { return kPickFirst; }
53 void UpdateLocked(UpdateArgs args) override;
54 void ExitIdleLocked() override;
55 void ResetBackoffLocked() override;
60 class PickFirstSubchannelList;
62 class PickFirstSubchannelData
63 : public SubchannelData<PickFirstSubchannelList,
64 PickFirstSubchannelData> {
66 PickFirstSubchannelData(
67 SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>*
69 const ServerAddress& address,
70 RefCountedPtr<SubchannelInterface> subchannel)
71 : SubchannelData(subchannel_list, address, std::move(subchannel)) {}
73 void ProcessConnectivityChangeLocked(
74 grpc_connectivity_state connectivity_state) override;
76 // Processes the connectivity change to READY for an unselected subchannel.
77 void ProcessUnselectedReadyLocked();
79 void CheckConnectivityStateAndStartWatchingLocked();
82 class PickFirstSubchannelList
83 : public SubchannelList<PickFirstSubchannelList,
84 PickFirstSubchannelData> {
86 PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer,
87 ServerAddressList addresses,
88 const grpc_channel_args& args)
89 : SubchannelList(policy, tracer, std::move(addresses),
90 policy->channel_control_helper(), args) {
91 // Need to maintain a ref to the LB policy as long as we maintain
92 // any references to subchannels, since the subchannels'
93 // pollset_sets will include the LB policy's pollset_set.
94 policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
97 ~PickFirstSubchannelList() {
98 PickFirst* p = static_cast<PickFirst*>(policy());
99 p->Unref(DEBUG_LOCATION, "subchannel_list");
102 bool in_transient_failure() const { return in_transient_failure_; }
103 void set_in_transient_failure(bool in_transient_failure) {
104 in_transient_failure_ = in_transient_failure;
108 bool in_transient_failure_ = false;
111 class Picker : public SubchannelPicker {
113 explicit Picker(RefCountedPtr<SubchannelInterface> subchannel)
114 : subchannel_(std::move(subchannel)) {}
116 PickResult Pick(PickArgs /*args*/) override {
118 result.type = PickResult::PICK_COMPLETE;
119 result.subchannel = subchannel_;
124 RefCountedPtr<SubchannelInterface> subchannel_;
127 void ShutdownLocked() override;
129 void AttemptToConnectUsingLatestUpdateArgsLocked();
131 // Lateset update args.
132 UpdateArgs latest_update_args_;
133 // All our subchannels.
134 OrphanablePtr<PickFirstSubchannelList> subchannel_list_;
135 // Latest pending subchannel list.
136 OrphanablePtr<PickFirstSubchannelList> latest_pending_subchannel_list_;
137 // Selected subchannel in \a subchannel_list_.
138 PickFirstSubchannelData* selected_ = nullptr;
139 // Are we in IDLE state?
142 bool shutdown_ = false;
145 PickFirst::PickFirst(Args args) : LoadBalancingPolicy(std::move(args)) {
146 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
147 gpr_log(GPR_INFO, "Pick First %p created.", this);
151 PickFirst::~PickFirst() {
152 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
153 gpr_log(GPR_INFO, "Destroying Pick First %p", this);
155 GPR_ASSERT(subchannel_list_ == nullptr);
156 GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
159 void PickFirst::ShutdownLocked() {
160 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
161 gpr_log(GPR_INFO, "Pick First %p Shutting down", this);
164 subchannel_list_.reset();
165 latest_pending_subchannel_list_.reset();
168 void PickFirst::ExitIdleLocked() {
169 if (shutdown_) return;
171 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
172 gpr_log(GPR_INFO, "Pick First %p exiting idle", this);
175 AttemptToConnectUsingLatestUpdateArgsLocked();
179 void PickFirst::ResetBackoffLocked() {
180 if (subchannel_list_ != nullptr) subchannel_list_->ResetBackoffLocked();
181 if (latest_pending_subchannel_list_ != nullptr) {
182 latest_pending_subchannel_list_->ResetBackoffLocked();
186 void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
187 // Create a subchannel list from the latest_update_args_.
188 auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>(
189 this, &grpc_lb_pick_first_trace, latest_update_args_.addresses,
190 *latest_update_args_.args);
191 // Empty update or no valid subchannels.
192 if (subchannel_list->num_subchannels() == 0) {
193 // Unsubscribe from all current subchannels.
194 subchannel_list_ = std::move(subchannel_list); // Empty list.
196 // If not idle, put the channel in TRANSIENT_FAILURE.
197 // (If we are idle, then this will happen in ExitIdleLocked() if we
198 // haven't gotten a non-empty update by the time the application tries
199 // to start a new call.)
201 grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
202 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
203 channel_control_helper()->UpdateState(
204 GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
205 absl::make_unique<TransientFailurePicker>(error));
208 // If one of the subchannels in the new list is already in state
209 // READY, then select it immediately. This can happen when the
210 // currently selected subchannel is also present in the update. It
211 // can also happen if one of the subchannels in the update is already
212 // in the global subchannel pool because it's in use by another channel.
213 for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
214 PickFirstSubchannelData* sd = subchannel_list->subchannel(i);
215 grpc_connectivity_state state = sd->CheckConnectivityStateLocked();
216 if (state == GRPC_CHANNEL_READY) {
217 subchannel_list_ = std::move(subchannel_list);
218 sd->StartConnectivityWatchLocked();
219 sd->ProcessUnselectedReadyLocked();
220 // If there was a previously pending update (which may or may
221 // not have contained the currently selected subchannel), drop
222 // it, so that it doesn't override what we've done here.
223 latest_pending_subchannel_list_.reset();
227 if (selected_ == nullptr) {
228 // We don't yet have a selected subchannel, so replace the current
229 // subchannel list immediately.
230 subchannel_list_ = std::move(subchannel_list);
231 // If we're not in IDLE state, start trying to connect to the first
232 // subchannel in the new list.
233 // Note: No need to use CheckConnectivityStateAndStartWatchingLocked()
234 // here, since we've already checked the initial connectivity
235 // state of all subchannels above.
236 subchannel_list_->subchannel(0)->StartConnectivityWatchLocked();
237 subchannel_list_->subchannel(0)->subchannel()->AttemptToConnect();
239 // We do have a selected subchannel (which means it's READY), so keep
240 // using it until one of the subchannels in the new list reports READY.
241 if (latest_pending_subchannel_list_ != nullptr) {
242 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
244 "Pick First %p Shutting down latest pending subchannel list "
245 "%p, about to be replaced by newer latest %p",
246 this, latest_pending_subchannel_list_.get(),
247 subchannel_list.get());
250 latest_pending_subchannel_list_ = std::move(subchannel_list);
251 // If we're not in IDLE state, start trying to connect to the first
252 // subchannel in the new list.
253 // Note: No need to use CheckConnectivityStateAndStartWatchingLocked()
254 // here, since we've already checked the initial connectivity
255 // state of all subchannels above.
256 latest_pending_subchannel_list_->subchannel(0)
257 ->StartConnectivityWatchLocked();
258 latest_pending_subchannel_list_->subchannel(0)
260 ->AttemptToConnect();
264 void PickFirst::UpdateLocked(UpdateArgs args) {
265 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
267 "Pick First %p received update with %" PRIuPTR " addresses", this,
268 args.addresses.size());
270 // Update the latest_update_args_
271 grpc_arg new_arg = grpc_channel_arg_integer_create(
272 const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1);
273 const grpc_channel_args* new_args =
274 grpc_channel_args_copy_and_add(args.args, &new_arg, 1);
275 GPR_SWAP(const grpc_channel_args*, new_args, args.args);
276 grpc_channel_args_destroy(new_args);
277 latest_update_args_ = std::move(args);
278 // If we are not in idle, start connection attempt immediately.
279 // Otherwise, we defer the attempt into ExitIdleLocked().
281 AttemptToConnectUsingLatestUpdateArgsLocked();
285 void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
286 grpc_connectivity_state connectivity_state) {
287 PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
288 // The notification must be for a subchannel in either the current or
289 // latest pending subchannel lists.
290 GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
291 subchannel_list() == p->latest_pending_subchannel_list_.get());
292 GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN);
293 // Handle updates for the currently selected subchannel.
294 if (p->selected_ == this) {
295 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
297 "Pick First %p selected subchannel connectivity changed to %s", p,
298 ConnectivityStateName(connectivity_state));
300 // If the new state is anything other than READY and there is a
301 // pending update, switch to the pending update.
302 if (connectivity_state != GRPC_CHANNEL_READY &&
303 p->latest_pending_subchannel_list_ != nullptr) {
304 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
306 "Pick First %p promoting pending subchannel list %p to "
308 p, p->latest_pending_subchannel_list_.get(),
309 p->subchannel_list_.get());
311 p->selected_ = nullptr;
312 CancelConnectivityWatchLocked(
313 "selected subchannel failed; switching to pending update");
314 p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
315 // Set our state to that of the pending subchannel list.
316 if (p->subchannel_list_->in_transient_failure()) {
317 grpc_error* error = grpc_error_set_int(
318 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
319 "selected subchannel failed; switching to pending update"),
320 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
321 p->channel_control_helper()->UpdateState(
322 GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
323 absl::make_unique<TransientFailurePicker>(error));
325 p->channel_control_helper()->UpdateState(
326 GRPC_CHANNEL_CONNECTING, absl::Status(),
327 absl::make_unique<QueuePicker>(
328 p->Ref(DEBUG_LOCATION, "QueuePicker")));
331 if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
332 // If the selected subchannel goes bad, request a re-resolution. We
333 // also set the channel state to IDLE. The reason is that if the new
334 // state is TRANSIENT_FAILURE due to a GOAWAY reception we don't want
335 // to connect to the re-resolved backends until we leave IDLE state.
336 // TODO(qianchengz): We may want to request re-resolution in
339 p->channel_control_helper()->RequestReresolution();
340 p->selected_ = nullptr;
341 p->subchannel_list_.reset();
342 p->channel_control_helper()->UpdateState(
343 GRPC_CHANNEL_IDLE, absl::Status(),
344 absl::make_unique<QueuePicker>(
345 p->Ref(DEBUG_LOCATION, "QueuePicker")));
347 // This is unlikely but can happen when a subchannel has been asked
348 // to reconnect by a different channel and this channel has dropped
349 // some connectivity state notifications.
350 if (connectivity_state == GRPC_CHANNEL_READY) {
351 p->channel_control_helper()->UpdateState(
352 GRPC_CHANNEL_READY, absl::Status(),
353 absl::make_unique<Picker>(subchannel()->Ref()));
354 } else { // CONNECTING
355 p->channel_control_helper()->UpdateState(
356 connectivity_state, absl::Status(),
357 absl::make_unique<QueuePicker>(
358 p->Ref(DEBUG_LOCATION, "QueuePicker")));
364 // If we get here, there are two possible cases:
365 // 1. We do not currently have a selected subchannel, and the update is
366 // for a subchannel in p->subchannel_list_ that we're trying to
367 // connect to. The goal here is to find a subchannel that we can
369 // 2. We do currently have a selected subchannel, and the update is
370 // for a subchannel in p->latest_pending_subchannel_list_. The
371 // goal here is to find a subchannel from the update that we can
372 // select in place of the current one.
373 subchannel_list()->set_in_transient_failure(false);
374 switch (connectivity_state) {
375 case GRPC_CHANNEL_READY: {
376 ProcessUnselectedReadyLocked();
379 case GRPC_CHANNEL_TRANSIENT_FAILURE: {
380 CancelConnectivityWatchLocked("connection attempt failed");
381 PickFirstSubchannelData* sd = this;
383 (sd->Index() + 1) % subchannel_list()->num_subchannels();
384 sd = subchannel_list()->subchannel(next_index);
385 // If we're tried all subchannels, set state to TRANSIENT_FAILURE.
386 if (sd->Index() == 0) {
387 // Re-resolve if this is the most recent subchannel list.
388 if (subchannel_list() == (p->latest_pending_subchannel_list_ != nullptr
389 ? p->latest_pending_subchannel_list_.get()
390 : p->subchannel_list_.get())) {
391 p->channel_control_helper()->RequestReresolution();
393 subchannel_list()->set_in_transient_failure(true);
394 // Only report new state in case 1.
395 if (subchannel_list() == p->subchannel_list_.get()) {
396 grpc_error* error = grpc_error_set_int(
397 GRPC_ERROR_CREATE_FROM_STATIC_STRING(
398 "failed to connect to all addresses"),
399 GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
400 p->channel_control_helper()->UpdateState(
401 GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
402 absl::make_unique<TransientFailurePicker>(error));
405 sd->CheckConnectivityStateAndStartWatchingLocked();
408 case GRPC_CHANNEL_CONNECTING:
409 case GRPC_CHANNEL_IDLE: {
410 // Only update connectivity state in case 1.
411 if (subchannel_list() == p->subchannel_list_.get()) {
412 p->channel_control_helper()->UpdateState(
413 GRPC_CHANNEL_CONNECTING, absl::Status(),
414 absl::make_unique<QueuePicker>(
415 p->Ref(DEBUG_LOCATION, "QueuePicker")));
419 case GRPC_CHANNEL_SHUTDOWN:
420 GPR_UNREACHABLE_CODE(break);
424 void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
425 PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
426 // If we get here, there are two possible cases:
427 // 1. We do not currently have a selected subchannel, and the update is
428 // for a subchannel in p->subchannel_list_ that we're trying to
429 // connect to. The goal here is to find a subchannel that we can
431 // 2. We do currently have a selected subchannel, and the update is
432 // for a subchannel in p->latest_pending_subchannel_list_. The
433 // goal here is to find a subchannel from the update that we can
434 // select in place of the current one.
435 GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
436 subchannel_list() == p->latest_pending_subchannel_list_.get());
437 // Case 2. Promote p->latest_pending_subchannel_list_ to p->subchannel_list_.
438 if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
439 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
441 "Pick First %p promoting pending subchannel list %p to "
443 p, p->latest_pending_subchannel_list_.get(),
444 p->subchannel_list_.get());
446 p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
449 if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
450 gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel());
453 p->channel_control_helper()->UpdateState(
454 GRPC_CHANNEL_READY, absl::Status(),
455 absl::make_unique<Picker>(subchannel()->Ref()));
456 for (size_t i = 0; i < subchannel_list()->num_subchannels(); ++i) {
458 subchannel_list()->subchannel(i)->ShutdownLocked();
463 void PickFirst::PickFirstSubchannelData::
464 CheckConnectivityStateAndStartWatchingLocked() {
465 PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
466 // Check current state.
467 grpc_connectivity_state current_state = CheckConnectivityStateLocked();
469 StartConnectivityWatchLocked();
470 // If current state is READY, select the subchannel now, since we started
471 // watching from this state and will not get a notification of it
472 // transitioning into this state.
473 // If the current state is not READY, attempt to connect.
474 if (current_state == GRPC_CHANNEL_READY) {
475 if (p->selected_ != this) ProcessUnselectedReadyLocked();
477 subchannel()->AttemptToConnect();
481 class PickFirstConfig : public LoadBalancingPolicy::Config {
483 const char* name() const override { return kPickFirst; }
490 class PickFirstFactory : public LoadBalancingPolicyFactory {
492 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
493 LoadBalancingPolicy::Args args) const override {
494 return MakeOrphanable<PickFirst>(std::move(args));
497 const char* name() const override { return kPickFirst; }
499 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
500 const Json& json, grpc_error** /*error*/) const override {
501 return MakeRefCounted<PickFirstConfig>();
507 } // namespace grpc_core
509 void grpc_lb_policy_pick_first_init() {
510 grpc_core::LoadBalancingPolicyRegistry::Builder::
511 RegisterLoadBalancingPolicyFactory(
512 absl::make_unique<grpc_core::PickFirstFactory>());
515 void grpc_lb_policy_pick_first_shutdown() {}