Imported Upstream version 1.33.1
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / lb_policy / pick_first / pick_first.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/support/port_platform.h>
20
21 #include <string.h>
22
23 #include <grpc/support/alloc.h>
24
25 #include "src/core/ext/filters/client_channel/lb_policy/subchannel_list.h"
26 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
27 #include "src/core/ext/filters/client_channel/server_address.h"
28 #include "src/core/ext/filters/client_channel/subchannel.h"
29 #include "src/core/lib/channel/channel_args.h"
30 #include "src/core/lib/gprpp/sync.h"
31 #include "src/core/lib/iomgr/sockaddr_utils.h"
32 #include "src/core/lib/transport/connectivity_state.h"
33 #include "src/core/lib/transport/error_utils.h"
34
35 namespace grpc_core {
36
37 TraceFlag grpc_lb_pick_first_trace(false, "pick_first");
38
39 namespace {
40
41 //
42 // pick_first LB policy
43 //
44
45 constexpr char kPickFirst[] = "pick_first";
46
47 class PickFirst : public LoadBalancingPolicy {
48  public:
49   explicit PickFirst(Args args);
50
51   const char* name() const override { return kPickFirst; }
52
53   void UpdateLocked(UpdateArgs args) override;
54   void ExitIdleLocked() override;
55   void ResetBackoffLocked() override;
56
57  private:
58   ~PickFirst();
59
60   class PickFirstSubchannelList;
61
62   class PickFirstSubchannelData
63       : public SubchannelData<PickFirstSubchannelList,
64                               PickFirstSubchannelData> {
65    public:
66     PickFirstSubchannelData(
67         SubchannelList<PickFirstSubchannelList, PickFirstSubchannelData>*
68             subchannel_list,
69         const ServerAddress& address,
70         RefCountedPtr<SubchannelInterface> subchannel)
71         : SubchannelData(subchannel_list, address, std::move(subchannel)) {}
72
73     void ProcessConnectivityChangeLocked(
74         grpc_connectivity_state connectivity_state) override;
75
76     // Processes the connectivity change to READY for an unselected subchannel.
77     void ProcessUnselectedReadyLocked();
78
79     void CheckConnectivityStateAndStartWatchingLocked();
80   };
81
82   class PickFirstSubchannelList
83       : public SubchannelList<PickFirstSubchannelList,
84                               PickFirstSubchannelData> {
85    public:
86     PickFirstSubchannelList(PickFirst* policy, TraceFlag* tracer,
87                             ServerAddressList addresses,
88                             const grpc_channel_args& args)
89         : SubchannelList(policy, tracer, std::move(addresses),
90                          policy->channel_control_helper(), args) {
91       // Need to maintain a ref to the LB policy as long as we maintain
92       // any references to subchannels, since the subchannels'
93       // pollset_sets will include the LB policy's pollset_set.
94       policy->Ref(DEBUG_LOCATION, "subchannel_list").release();
95     }
96
97     ~PickFirstSubchannelList() {
98       PickFirst* p = static_cast<PickFirst*>(policy());
99       p->Unref(DEBUG_LOCATION, "subchannel_list");
100     }
101
102     bool in_transient_failure() const { return in_transient_failure_; }
103     void set_in_transient_failure(bool in_transient_failure) {
104       in_transient_failure_ = in_transient_failure;
105     }
106
107    private:
108     bool in_transient_failure_ = false;
109   };
110
111   class Picker : public SubchannelPicker {
112    public:
113     explicit Picker(RefCountedPtr<SubchannelInterface> subchannel)
114         : subchannel_(std::move(subchannel)) {}
115
116     PickResult Pick(PickArgs /*args*/) override {
117       PickResult result;
118       result.type = PickResult::PICK_COMPLETE;
119       result.subchannel = subchannel_;
120       return result;
121     }
122
123    private:
124     RefCountedPtr<SubchannelInterface> subchannel_;
125   };
126
127   void ShutdownLocked() override;
128
129   void AttemptToConnectUsingLatestUpdateArgsLocked();
130
131   // Lateset update args.
132   UpdateArgs latest_update_args_;
133   // All our subchannels.
134   OrphanablePtr<PickFirstSubchannelList> subchannel_list_;
135   // Latest pending subchannel list.
136   OrphanablePtr<PickFirstSubchannelList> latest_pending_subchannel_list_;
137   // Selected subchannel in \a subchannel_list_.
138   PickFirstSubchannelData* selected_ = nullptr;
139   // Are we in IDLE state?
140   bool idle_ = false;
141   // Are we shut down?
142   bool shutdown_ = false;
143 };
144
145 PickFirst::PickFirst(Args args) : LoadBalancingPolicy(std::move(args)) {
146   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
147     gpr_log(GPR_INFO, "Pick First %p created.", this);
148   }
149 }
150
151 PickFirst::~PickFirst() {
152   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
153     gpr_log(GPR_INFO, "Destroying Pick First %p", this);
154   }
155   GPR_ASSERT(subchannel_list_ == nullptr);
156   GPR_ASSERT(latest_pending_subchannel_list_ == nullptr);
157 }
158
159 void PickFirst::ShutdownLocked() {
160   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
161     gpr_log(GPR_INFO, "Pick First %p Shutting down", this);
162   }
163   shutdown_ = true;
164   subchannel_list_.reset();
165   latest_pending_subchannel_list_.reset();
166 }
167
168 void PickFirst::ExitIdleLocked() {
169   if (shutdown_) return;
170   if (idle_) {
171     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
172       gpr_log(GPR_INFO, "Pick First %p exiting idle", this);
173     }
174     idle_ = false;
175     AttemptToConnectUsingLatestUpdateArgsLocked();
176   }
177 }
178
179 void PickFirst::ResetBackoffLocked() {
180   if (subchannel_list_ != nullptr) subchannel_list_->ResetBackoffLocked();
181   if (latest_pending_subchannel_list_ != nullptr) {
182     latest_pending_subchannel_list_->ResetBackoffLocked();
183   }
184 }
185
186 void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
187   // Create a subchannel list from the latest_update_args_.
188   auto subchannel_list = MakeOrphanable<PickFirstSubchannelList>(
189       this, &grpc_lb_pick_first_trace, latest_update_args_.addresses,
190       *latest_update_args_.args);
191   // Empty update or no valid subchannels.
192   if (subchannel_list->num_subchannels() == 0) {
193     // Unsubscribe from all current subchannels.
194     subchannel_list_ = std::move(subchannel_list);  // Empty list.
195     selected_ = nullptr;
196     // If not idle, put the channel in TRANSIENT_FAILURE.
197     // (If we are idle, then this will happen in ExitIdleLocked() if we
198     // haven't gotten a non-empty update by the time the application tries
199     // to start a new call.)
200     grpc_error* error =
201         grpc_error_set_int(GRPC_ERROR_CREATE_FROM_STATIC_STRING("Empty update"),
202                            GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
203     channel_control_helper()->UpdateState(
204         GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
205         absl::make_unique<TransientFailurePicker>(error));
206     return;
207   }
208   // If one of the subchannels in the new list is already in state
209   // READY, then select it immediately.  This can happen when the
210   // currently selected subchannel is also present in the update.  It
211   // can also happen if one of the subchannels in the update is already
212   // in the global subchannel pool because it's in use by another channel.
213   for (size_t i = 0; i < subchannel_list->num_subchannels(); ++i) {
214     PickFirstSubchannelData* sd = subchannel_list->subchannel(i);
215     grpc_connectivity_state state = sd->CheckConnectivityStateLocked();
216     if (state == GRPC_CHANNEL_READY) {
217       subchannel_list_ = std::move(subchannel_list);
218       sd->StartConnectivityWatchLocked();
219       sd->ProcessUnselectedReadyLocked();
220       // If there was a previously pending update (which may or may
221       // not have contained the currently selected subchannel), drop
222       // it, so that it doesn't override what we've done here.
223       latest_pending_subchannel_list_.reset();
224       return;
225     }
226   }
227   if (selected_ == nullptr) {
228     // We don't yet have a selected subchannel, so replace the current
229     // subchannel list immediately.
230     subchannel_list_ = std::move(subchannel_list);
231     // If we're not in IDLE state, start trying to connect to the first
232     // subchannel in the new list.
233     // Note: No need to use CheckConnectivityStateAndStartWatchingLocked()
234     // here, since we've already checked the initial connectivity
235     // state of all subchannels above.
236     subchannel_list_->subchannel(0)->StartConnectivityWatchLocked();
237     subchannel_list_->subchannel(0)->subchannel()->AttemptToConnect();
238   } else {
239     // We do have a selected subchannel (which means it's READY), so keep
240     // using it until one of the subchannels in the new list reports READY.
241     if (latest_pending_subchannel_list_ != nullptr) {
242       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
243         gpr_log(GPR_INFO,
244                 "Pick First %p Shutting down latest pending subchannel list "
245                 "%p, about to be replaced by newer latest %p",
246                 this, latest_pending_subchannel_list_.get(),
247                 subchannel_list.get());
248       }
249     }
250     latest_pending_subchannel_list_ = std::move(subchannel_list);
251     // If we're not in IDLE state, start trying to connect to the first
252     // subchannel in the new list.
253     // Note: No need to use CheckConnectivityStateAndStartWatchingLocked()
254     // here, since we've already checked the initial connectivity
255     // state of all subchannels above.
256     latest_pending_subchannel_list_->subchannel(0)
257         ->StartConnectivityWatchLocked();
258     latest_pending_subchannel_list_->subchannel(0)
259         ->subchannel()
260         ->AttemptToConnect();
261   }
262 }
263
264 void PickFirst::UpdateLocked(UpdateArgs args) {
265   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
266     gpr_log(GPR_INFO,
267             "Pick First %p received update with %" PRIuPTR " addresses", this,
268             args.addresses.size());
269   }
270   // Update the latest_update_args_
271   grpc_arg new_arg = grpc_channel_arg_integer_create(
272       const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1);
273   const grpc_channel_args* new_args =
274       grpc_channel_args_copy_and_add(args.args, &new_arg, 1);
275   GPR_SWAP(const grpc_channel_args*, new_args, args.args);
276   grpc_channel_args_destroy(new_args);
277   latest_update_args_ = std::move(args);
278   // If we are not in idle, start connection attempt immediately.
279   // Otherwise, we defer the attempt into ExitIdleLocked().
280   if (!idle_) {
281     AttemptToConnectUsingLatestUpdateArgsLocked();
282   }
283 }
284
285 void PickFirst::PickFirstSubchannelData::ProcessConnectivityChangeLocked(
286     grpc_connectivity_state connectivity_state) {
287   PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
288   // The notification must be for a subchannel in either the current or
289   // latest pending subchannel lists.
290   GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
291              subchannel_list() == p->latest_pending_subchannel_list_.get());
292   GPR_ASSERT(connectivity_state != GRPC_CHANNEL_SHUTDOWN);
293   // Handle updates for the currently selected subchannel.
294   if (p->selected_ == this) {
295     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
296       gpr_log(GPR_INFO,
297               "Pick First %p selected subchannel connectivity changed to %s", p,
298               ConnectivityStateName(connectivity_state));
299     }
300     // If the new state is anything other than READY and there is a
301     // pending update, switch to the pending update.
302     if (connectivity_state != GRPC_CHANNEL_READY &&
303         p->latest_pending_subchannel_list_ != nullptr) {
304       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
305         gpr_log(GPR_INFO,
306                 "Pick First %p promoting pending subchannel list %p to "
307                 "replace %p",
308                 p, p->latest_pending_subchannel_list_.get(),
309                 p->subchannel_list_.get());
310       }
311       p->selected_ = nullptr;
312       CancelConnectivityWatchLocked(
313           "selected subchannel failed; switching to pending update");
314       p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
315       // Set our state to that of the pending subchannel list.
316       if (p->subchannel_list_->in_transient_failure()) {
317         grpc_error* error = grpc_error_set_int(
318             GRPC_ERROR_CREATE_FROM_STATIC_STRING(
319                 "selected subchannel failed; switching to pending update"),
320             GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
321         p->channel_control_helper()->UpdateState(
322             GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
323             absl::make_unique<TransientFailurePicker>(error));
324       } else {
325         p->channel_control_helper()->UpdateState(
326             GRPC_CHANNEL_CONNECTING, absl::Status(),
327             absl::make_unique<QueuePicker>(
328                 p->Ref(DEBUG_LOCATION, "QueuePicker")));
329       }
330     } else {
331       if (connectivity_state == GRPC_CHANNEL_TRANSIENT_FAILURE) {
332         // If the selected subchannel goes bad, request a re-resolution. We
333         // also set the channel state to IDLE. The reason is that if the new
334         // state is TRANSIENT_FAILURE due to a GOAWAY reception we don't want
335         // to connect to the re-resolved backends until we leave IDLE state.
336         // TODO(qianchengz): We may want to request re-resolution in
337         // ExitIdleLocked().
338         p->idle_ = true;
339         p->channel_control_helper()->RequestReresolution();
340         p->selected_ = nullptr;
341         p->subchannel_list_.reset();
342         p->channel_control_helper()->UpdateState(
343             GRPC_CHANNEL_IDLE, absl::Status(),
344             absl::make_unique<QueuePicker>(
345                 p->Ref(DEBUG_LOCATION, "QueuePicker")));
346       } else {
347         // This is unlikely but can happen when a subchannel has been asked
348         // to reconnect by a different channel and this channel has dropped
349         // some connectivity state notifications.
350         if (connectivity_state == GRPC_CHANNEL_READY) {
351           p->channel_control_helper()->UpdateState(
352               GRPC_CHANNEL_READY, absl::Status(),
353               absl::make_unique<Picker>(subchannel()->Ref()));
354         } else {  // CONNECTING
355           p->channel_control_helper()->UpdateState(
356               connectivity_state, absl::Status(),
357               absl::make_unique<QueuePicker>(
358                   p->Ref(DEBUG_LOCATION, "QueuePicker")));
359         }
360       }
361     }
362     return;
363   }
364   // If we get here, there are two possible cases:
365   // 1. We do not currently have a selected subchannel, and the update is
366   //    for a subchannel in p->subchannel_list_ that we're trying to
367   //    connect to.  The goal here is to find a subchannel that we can
368   //    select.
369   // 2. We do currently have a selected subchannel, and the update is
370   //    for a subchannel in p->latest_pending_subchannel_list_.  The
371   //    goal here is to find a subchannel from the update that we can
372   //    select in place of the current one.
373   subchannel_list()->set_in_transient_failure(false);
374   switch (connectivity_state) {
375     case GRPC_CHANNEL_READY: {
376       ProcessUnselectedReadyLocked();
377       break;
378     }
379     case GRPC_CHANNEL_TRANSIENT_FAILURE: {
380       CancelConnectivityWatchLocked("connection attempt failed");
381       PickFirstSubchannelData* sd = this;
382       size_t next_index =
383           (sd->Index() + 1) % subchannel_list()->num_subchannels();
384       sd = subchannel_list()->subchannel(next_index);
385       // If we're tried all subchannels, set state to TRANSIENT_FAILURE.
386       if (sd->Index() == 0) {
387         // Re-resolve if this is the most recent subchannel list.
388         if (subchannel_list() == (p->latest_pending_subchannel_list_ != nullptr
389                                       ? p->latest_pending_subchannel_list_.get()
390                                       : p->subchannel_list_.get())) {
391           p->channel_control_helper()->RequestReresolution();
392         }
393         subchannel_list()->set_in_transient_failure(true);
394         // Only report new state in case 1.
395         if (subchannel_list() == p->subchannel_list_.get()) {
396           grpc_error* error = grpc_error_set_int(
397               GRPC_ERROR_CREATE_FROM_STATIC_STRING(
398                   "failed to connect to all addresses"),
399               GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
400           p->channel_control_helper()->UpdateState(
401               GRPC_CHANNEL_TRANSIENT_FAILURE, grpc_error_to_absl_status(error),
402               absl::make_unique<TransientFailurePicker>(error));
403         }
404       }
405       sd->CheckConnectivityStateAndStartWatchingLocked();
406       break;
407     }
408     case GRPC_CHANNEL_CONNECTING:
409     case GRPC_CHANNEL_IDLE: {
410       // Only update connectivity state in case 1.
411       if (subchannel_list() == p->subchannel_list_.get()) {
412         p->channel_control_helper()->UpdateState(
413             GRPC_CHANNEL_CONNECTING, absl::Status(),
414             absl::make_unique<QueuePicker>(
415                 p->Ref(DEBUG_LOCATION, "QueuePicker")));
416       }
417       break;
418     }
419     case GRPC_CHANNEL_SHUTDOWN:
420       GPR_UNREACHABLE_CODE(break);
421   }
422 }
423
424 void PickFirst::PickFirstSubchannelData::ProcessUnselectedReadyLocked() {
425   PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
426   // If we get here, there are two possible cases:
427   // 1. We do not currently have a selected subchannel, and the update is
428   //    for a subchannel in p->subchannel_list_ that we're trying to
429   //    connect to.  The goal here is to find a subchannel that we can
430   //    select.
431   // 2. We do currently have a selected subchannel, and the update is
432   //    for a subchannel in p->latest_pending_subchannel_list_.  The
433   //    goal here is to find a subchannel from the update that we can
434   //    select in place of the current one.
435   GPR_ASSERT(subchannel_list() == p->subchannel_list_.get() ||
436              subchannel_list() == p->latest_pending_subchannel_list_.get());
437   // Case 2.  Promote p->latest_pending_subchannel_list_ to p->subchannel_list_.
438   if (subchannel_list() == p->latest_pending_subchannel_list_.get()) {
439     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
440       gpr_log(GPR_INFO,
441               "Pick First %p promoting pending subchannel list %p to "
442               "replace %p",
443               p, p->latest_pending_subchannel_list_.get(),
444               p->subchannel_list_.get());
445     }
446     p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
447   }
448   // Cases 1 and 2.
449   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
450     gpr_log(GPR_INFO, "Pick First %p selected subchannel %p", p, subchannel());
451   }
452   p->selected_ = this;
453   p->channel_control_helper()->UpdateState(
454       GRPC_CHANNEL_READY, absl::Status(),
455       absl::make_unique<Picker>(subchannel()->Ref()));
456   for (size_t i = 0; i < subchannel_list()->num_subchannels(); ++i) {
457     if (i != Index()) {
458       subchannel_list()->subchannel(i)->ShutdownLocked();
459     }
460   }
461 }
462
463 void PickFirst::PickFirstSubchannelData::
464     CheckConnectivityStateAndStartWatchingLocked() {
465   PickFirst* p = static_cast<PickFirst*>(subchannel_list()->policy());
466   // Check current state.
467   grpc_connectivity_state current_state = CheckConnectivityStateLocked();
468   // Start watch.
469   StartConnectivityWatchLocked();
470   // If current state is READY, select the subchannel now, since we started
471   // watching from this state and will not get a notification of it
472   // transitioning into this state.
473   // If the current state is not READY, attempt to connect.
474   if (current_state == GRPC_CHANNEL_READY) {
475     if (p->selected_ != this) ProcessUnselectedReadyLocked();
476   } else {
477     subchannel()->AttemptToConnect();
478   }
479 }
480
481 class PickFirstConfig : public LoadBalancingPolicy::Config {
482  public:
483   const char* name() const override { return kPickFirst; }
484 };
485
486 //
487 // factory
488 //
489
490 class PickFirstFactory : public LoadBalancingPolicyFactory {
491  public:
492   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
493       LoadBalancingPolicy::Args args) const override {
494     return MakeOrphanable<PickFirst>(std::move(args));
495   }
496
497   const char* name() const override { return kPickFirst; }
498
499   RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
500       const Json& json, grpc_error** /*error*/) const override {
501     return MakeRefCounted<PickFirstConfig>();
502   }
503 };
504
505 }  // namespace
506
507 }  // namespace grpc_core
508
509 void grpc_lb_policy_pick_first_init() {
510   grpc_core::LoadBalancingPolicyRegistry::Builder::
511       RegisterLoadBalancingPolicyFactory(
512           absl::make_unique<grpc_core::PickFirstFactory>());
513 }
514
515 void grpc_lb_policy_pick_first_shutdown() {}