3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 /// Implementation of the gRPC LB policy.
21 /// This policy takes as input a list of resolved addresses, which must
22 /// include at least one balancer address.
24 /// An internal channel (\a lb_channel_) is created for the addresses
25 /// from that are balancers. This channel behaves just like a regular
26 /// channel that uses pick_first to select from the list of balancer
29 /// When we get our initial update, we instantiate the internal *streaming*
30 /// call to the LB server (whichever address pick_first chose). The call
31 /// will be complete when either the balancer sends status or when we cancel
32 /// the call (e.g., because we are shutting down). In needed, we retry the
33 /// call. If we received at least one valid message from the server, a new
34 /// call attempt will be made immediately; otherwise, we apply back-off
35 /// delays between attempts.
37 /// We maintain an internal round_robin policy instance for distributing
38 /// requests across backends. Whenever we receive a new serverlist from
39 /// the balancer, we update the round_robin policy with the new list of
40 /// addresses. If we cannot communicate with the balancer on startup,
41 /// however, we may enter fallback mode, in which case we will populate
42 /// the child policy's addresses from the backend addresses returned by the
45 /// Once a child policy instance is in place (and getting updated as described),
46 /// calls for a pick, a ping, or a cancellation will be serviced right
47 /// away by forwarding them to the child policy instance. Any time there's no
48 /// child policy available (i.e., right after the creation of the gRPCLB
49 /// policy), pick requests are queued.
51 /// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
52 /// high level design and details.
54 // With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
55 // using that endpoint. Because of various transitive includes in uv.h,
56 // including windows.h on Windows, uv.h must be included before other system
57 // headers. Therefore, sockaddr.h must always be included first.
58 #include <grpc/support/port_platform.h>
60 #include "src/core/lib/iomgr/sockaddr.h"
61 #include "src/core/lib/iomgr/socket_utils.h"
67 #include <grpc/byte_buffer_reader.h>
68 #include <grpc/grpc.h>
69 #include <grpc/support/alloc.h>
70 #include <grpc/support/string_util.h>
71 #include <grpc/support/time.h>
73 #include "src/core/ext/filters/client_channel/client_channel.h"
74 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
75 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
76 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
77 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
78 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
79 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
80 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
81 #include "src/core/ext/filters/client_channel/parse_address.h"
82 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
83 #include "src/core/ext/filters/client_channel/server_address.h"
84 #include "src/core/lib/backoff/backoff.h"
85 #include "src/core/lib/channel/channel_args.h"
86 #include "src/core/lib/channel/channel_stack.h"
87 #include "src/core/lib/gpr/host_port.h"
88 #include "src/core/lib/gpr/string.h"
89 #include "src/core/lib/gprpp/manual_constructor.h"
90 #include "src/core/lib/gprpp/memory.h"
91 #include "src/core/lib/gprpp/mutex_lock.h"
92 #include "src/core/lib/gprpp/orphanable.h"
93 #include "src/core/lib/gprpp/ref_counted_ptr.h"
94 #include "src/core/lib/iomgr/combiner.h"
95 #include "src/core/lib/iomgr/sockaddr.h"
96 #include "src/core/lib/iomgr/sockaddr_utils.h"
97 #include "src/core/lib/iomgr/timer.h"
98 #include "src/core/lib/slice/slice_hash_table.h"
99 #include "src/core/lib/slice/slice_internal.h"
100 #include "src/core/lib/slice/slice_string_helpers.h"
101 #include "src/core/lib/surface/call.h"
102 #include "src/core/lib/surface/channel.h"
103 #include "src/core/lib/surface/channel_init.h"
104 #include "src/core/lib/transport/static_metadata.h"
106 #define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
107 #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
108 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
109 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2
110 #define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
112 #define GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN "grpc.grpclb_address_lb_token"
114 namespace grpc_core {
116 TraceFlag grpc_lb_glb_trace(false, "glb");
120 constexpr char kGrpclb[] = "grpclb";
122 class GrpcLb : public LoadBalancingPolicy {
124 explicit GrpcLb(Args args);
126 const char* name() const override { return kGrpclb; }
128 void UpdateLocked(UpdateArgs args) override;
129 void ResetBackoffLocked() override;
130 void FillChildRefsForChannelz(
131 channelz::ChildRefsList* child_subchannels,
132 channelz::ChildRefsList* child_channels) override;
135 /// Contains a call to the LB server and all the data related to the call.
136 class BalancerCallState : public InternallyRefCounted<BalancerCallState> {
138 explicit BalancerCallState(
139 RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy);
141 // It's the caller's responsibility to ensure that Orphan() is called from
142 // inside the combiner.
143 void Orphan() override;
147 GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
149 bool seen_initial_response() const { return seen_initial_response_; }
150 bool seen_serverlist() const { return seen_serverlist_; }
153 // So Delete() can access our private dtor.
154 template <typename T>
155 friend void grpc_core::Delete(T*);
157 ~BalancerCallState();
159 GrpcLb* grpclb_policy() const {
160 return static_cast<GrpcLb*>(grpclb_policy_.get());
163 void ScheduleNextClientLoadReportLocked();
164 void SendClientLoadReportLocked();
166 static bool LoadReportCountersAreZero(grpc_grpclb_request* request);
168 static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error);
169 static void ClientLoadReportDoneLocked(void* arg, grpc_error* error);
170 static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
171 static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error);
172 static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error);
174 // The owning LB policy.
175 RefCountedPtr<LoadBalancingPolicy> grpclb_policy_;
177 // The streaming call to the LB server. Always non-NULL.
178 grpc_call* lb_call_ = nullptr;
180 // recv_initial_metadata
181 grpc_metadata_array lb_initial_metadata_recv_;
184 grpc_byte_buffer* send_message_payload_ = nullptr;
185 grpc_closure lb_on_initial_request_sent_;
188 grpc_byte_buffer* recv_message_payload_ = nullptr;
189 grpc_closure lb_on_balancer_message_received_;
190 bool seen_initial_response_ = false;
191 bool seen_serverlist_ = false;
193 // recv_trailing_metadata
194 grpc_closure lb_on_balancer_status_received_;
195 grpc_metadata_array lb_trailing_metadata_recv_;
196 grpc_status_code lb_call_status_;
197 grpc_slice lb_call_status_details_;
199 // The stats for client-side load reporting associated with this LB call.
200 // Created after the first serverlist is received.
201 RefCountedPtr<GrpcLbClientStats> client_stats_;
202 grpc_millis client_stats_report_interval_ = 0;
203 grpc_timer client_load_report_timer_;
204 bool client_load_report_timer_callback_pending_ = false;
205 bool last_client_load_report_counters_were_zero_ = false;
206 bool client_load_report_is_due_ = false;
207 // The closure used for either the load report timer or the callback for
208 // completion of sending the load report.
209 grpc_closure client_load_report_closure_;
212 class Serverlist : public RefCounted<Serverlist> {
214 // Takes ownership of serverlist.
215 explicit Serverlist(grpc_grpclb_serverlist* serverlist)
216 : serverlist_(serverlist) {}
218 ~Serverlist() { grpc_grpclb_destroy_serverlist(serverlist_); }
220 bool operator==(const Serverlist& other) const;
222 const grpc_grpclb_serverlist* serverlist() const { return serverlist_; }
224 // Returns a text representation suitable for logging.
225 UniquePtr<char> AsText() const;
227 // Extracts all non-drop entries into a ServerAddressList.
228 ServerAddressList GetServerAddressList(
229 GrpcLbClientStats* client_stats) const;
231 // Returns true if the serverlist contains at least one drop entry and
232 // no backend address entries.
233 bool ContainsAllDropEntries() const;
235 // Returns the LB token to use for a drop, or null if the call
236 // should not be dropped.
237 // Intended to be called from picker, so calls will be externally
239 const char* ShouldDrop();
242 grpc_grpclb_serverlist* serverlist_;
243 size_t drop_index_ = 0;
246 class Picker : public SubchannelPicker {
248 Picker(GrpcLb* parent, RefCountedPtr<Serverlist> serverlist,
249 UniquePtr<SubchannelPicker> child_picker,
250 RefCountedPtr<GrpcLbClientStats> client_stats)
252 serverlist_(std::move(serverlist)),
253 child_picker_(std::move(child_picker)),
254 client_stats_(std::move(client_stats)) {}
256 PickResult Pick(PickArgs* pick, grpc_error** error) override;
259 // Storing the address for logging, but not holding a ref.
263 // Serverlist to be used for determining drops.
264 RefCountedPtr<Serverlist> serverlist_;
266 UniquePtr<SubchannelPicker> child_picker_;
267 RefCountedPtr<GrpcLbClientStats> client_stats_;
270 class Helper : public ChannelControlHelper {
272 explicit Helper(RefCountedPtr<GrpcLb> parent)
273 : parent_(std::move(parent)) {}
275 Subchannel* CreateSubchannel(const grpc_channel_args& args) override;
276 grpc_channel* CreateChannel(const char* target,
277 const grpc_channel_args& args) override;
278 void UpdateState(grpc_connectivity_state state, grpc_error* state_error,
279 UniquePtr<SubchannelPicker> picker) override;
280 void RequestReresolution() override;
282 void set_child(LoadBalancingPolicy* child) { child_ = child; }
285 bool CalledByPendingChild() const;
286 bool CalledByCurrentChild() const;
288 RefCountedPtr<GrpcLb> parent_;
289 LoadBalancingPolicy* child_ = nullptr;
294 void ShutdownLocked() override;
296 // Helper functions used in UpdateLocked().
297 void ProcessAddressesAndChannelArgsLocked(const ServerAddressList& addresses,
298 const grpc_channel_args& args);
299 void ParseLbConfig(Config* grpclb_config);
300 static void OnBalancerChannelConnectivityChangedLocked(void* arg,
302 void CancelBalancerChannelConnectivityWatchLocked();
304 // Methods for dealing with fallback state.
305 void MaybeEnterFallbackModeAfterStartup();
306 static void OnFallbackTimerLocked(void* arg, grpc_error* error);
308 // Methods for dealing with the balancer call.
309 void StartBalancerCallLocked();
310 void StartBalancerCallRetryTimerLocked();
311 static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error);
313 // Methods for dealing with the child policy.
314 grpc_channel_args* CreateChildPolicyArgsLocked(
315 bool is_backend_from_grpclb_load_balancer);
316 OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
317 const char* name, const grpc_channel_args* args);
318 void CreateOrUpdateChildPolicyLocked();
320 // Who the client is trying to communicate with.
321 const char* server_name_ = nullptr;
323 // Current channel args from the resolver.
324 grpc_channel_args* args_ = nullptr;
327 bool shutting_down_ = false;
329 // The channel for communicating with the LB server.
330 grpc_channel* lb_channel_ = nullptr;
331 // Uuid of the lb channel. Used for channelz.
332 gpr_atm lb_channel_uuid_ = 0;
333 // Response generator to inject address updates into lb_channel_.
334 RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
336 // The data associated with the current LB call. It holds a ref to this LB
337 // policy. It's initialized every time we query for backends. It's reset to
338 // NULL whenever the current LB call is no longer needed (e.g., the LB policy
339 // is shutting down, or the LB call has ended). A non-NULL lb_calld_ always
340 // contains a non-NULL lb_call_.
341 OrphanablePtr<BalancerCallState> lb_calld_;
342 // Timeout in milliseconds for the LB call. 0 means no deadline.
343 int lb_call_timeout_ms_ = 0;
344 // Balancer call retry state.
345 BackOff lb_call_backoff_;
346 bool retry_timer_callback_pending_ = false;
347 grpc_timer lb_call_retry_timer_;
348 grpc_closure lb_on_call_retry_;
350 // The deserialized response from the balancer. May be nullptr until one
351 // such response has arrived.
352 RefCountedPtr<Serverlist> serverlist_;
354 // Whether we're in fallback mode.
355 bool fallback_mode_ = false;
356 // The backend addresses from the resolver.
357 ServerAddressList fallback_backend_addresses_;
358 // State for fallback-at-startup checks.
359 // Timeout after startup after which we will go into fallback mode if
360 // we have not received a serverlist from the balancer.
361 int fallback_at_startup_timeout_ = 0;
362 bool fallback_at_startup_checks_pending_ = false;
363 grpc_timer lb_fallback_timer_;
364 grpc_closure lb_on_fallback_;
365 grpc_connectivity_state lb_channel_connectivity_ = GRPC_CHANNEL_IDLE;
366 grpc_closure lb_channel_on_connectivity_changed_;
368 // Lock held when modifying the value of child_policy_ or
369 // pending_child_policy_.
370 gpr_mu child_policy_mu_;
371 // The child policy to use for the backends.
372 OrphanablePtr<LoadBalancingPolicy> child_policy_;
373 // When switching child policies, the new policy will be stored here
374 // until it reports READY, at which point it will be moved to child_policy_.
375 OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
376 // The child policy config.
377 RefCountedPtr<Config> child_policy_config_;
378 // Child policy in state READY.
379 bool child_policy_ready_ = false;
383 // GrpcLb::Serverlist
386 bool GrpcLb::Serverlist::operator==(const Serverlist& other) const {
387 return grpc_grpclb_serverlist_equals(serverlist_, other.serverlist_);
390 void ParseServer(const grpc_grpclb_server* server,
391 grpc_resolved_address* addr) {
392 memset(addr, 0, sizeof(*addr));
393 if (server->drop) return;
394 const uint16_t netorder_port = grpc_htons((uint16_t)server->port);
395 /* the addresses are given in binary format (a in(6)_addr struct) in
396 * server->ip_address.bytes. */
397 const grpc_grpclb_ip_address* ip = &server->ip_address;
399 addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in));
400 grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr);
401 addr4->sin_family = GRPC_AF_INET;
402 memcpy(&addr4->sin_addr, ip->bytes, ip->size);
403 addr4->sin_port = netorder_port;
404 } else if (ip->size == 16) {
405 addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6));
406 grpc_sockaddr_in6* addr6 = (grpc_sockaddr_in6*)&addr->addr;
407 addr6->sin6_family = GRPC_AF_INET6;
408 memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
409 addr6->sin6_port = netorder_port;
413 UniquePtr<char> GrpcLb::Serverlist::AsText() const {
415 gpr_strvec_init(&entries);
416 for (size_t i = 0; i < serverlist_->num_servers; ++i) {
417 const auto* server = serverlist_->servers[i];
420 ipport = gpr_strdup("(drop)");
422 grpc_resolved_address addr;
423 ParseServer(server, &addr);
424 grpc_sockaddr_to_string(&ipport, &addr, false);
427 gpr_asprintf(&entry, " %" PRIuPTR ": %s token=%s\n", i, ipport,
428 server->load_balance_token);
430 gpr_strvec_add(&entries, entry);
432 UniquePtr<char> result(gpr_strvec_flatten(&entries, nullptr));
433 gpr_strvec_destroy(&entries);
437 // vtable for LB token channel arg.
438 void* lb_token_copy(void* token) {
439 return token == nullptr
441 : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload;
443 void lb_token_destroy(void* token) {
444 if (token != nullptr) {
445 GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token});
448 int lb_token_cmp(void* token1, void* token2) {
449 // Always indicate a match, since we don't want this channel arg to
450 // affect the subchannel's key in the index.
453 const grpc_arg_pointer_vtable lb_token_arg_vtable = {
454 lb_token_copy, lb_token_destroy, lb_token_cmp};
456 bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) {
457 if (server->drop) return false;
458 const grpc_grpclb_ip_address* ip = &server->ip_address;
459 if (GPR_UNLIKELY(server->port >> 16 != 0)) {
462 "Invalid port '%d' at index %lu of serverlist. Ignoring.",
463 server->port, (unsigned long)idx);
467 if (GPR_UNLIKELY(ip->size != 4 && ip->size != 16)) {
470 "Expected IP to be 4 or 16 bytes, got %d at index %lu of "
471 "serverlist. Ignoring",
472 ip->size, (unsigned long)idx);
479 // Returns addresses extracted from the serverlist.
480 ServerAddressList GrpcLb::Serverlist::GetServerAddressList(
481 GrpcLbClientStats* client_stats) const {
482 ServerAddressList addresses;
483 for (size_t i = 0; i < serverlist_->num_servers; ++i) {
484 const grpc_grpclb_server* server = serverlist_->servers[i];
485 if (!IsServerValid(serverlist_->servers[i], i, false)) continue;
486 // Address processing.
487 grpc_resolved_address addr;
488 ParseServer(server, &addr);
489 // LB token processing.
490 grpc_mdelem lb_token;
491 if (server->has_load_balance_token) {
492 const size_t lb_token_max_length =
493 GPR_ARRAY_SIZE(server->load_balance_token);
494 const size_t lb_token_length =
495 strnlen(server->load_balance_token, lb_token_max_length);
496 grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
497 server->load_balance_token, lb_token_length);
498 lb_token = grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr);
499 if (client_stats != nullptr) {
500 GPR_ASSERT(grpc_mdelem_set_user_data(
501 lb_token, GrpcLbClientStats::Destroy,
502 client_stats->Ref().release()) == client_stats);
505 char* uri = grpc_sockaddr_to_uri(&addr);
507 "Missing LB token for backend address '%s'. The empty token will "
511 lb_token = GRPC_MDELEM_LB_TOKEN_EMPTY;
514 grpc_arg arg = grpc_channel_arg_pointer_create(
515 const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN),
516 (void*)lb_token.payload, &lb_token_arg_vtable);
517 grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
518 addresses.emplace_back(addr, args);
520 GRPC_MDELEM_UNREF(lb_token);
525 bool GrpcLb::Serverlist::ContainsAllDropEntries() const {
526 if (serverlist_->num_servers == 0) return false;
527 for (size_t i = 0; i < serverlist_->num_servers; ++i) {
528 if (!serverlist_->servers[i]->drop) return false;
533 const char* GrpcLb::Serverlist::ShouldDrop() {
534 if (serverlist_->num_servers == 0) return nullptr;
535 grpc_grpclb_server* server = serverlist_->servers[drop_index_];
536 drop_index_ = (drop_index_ + 1) % serverlist_->num_servers;
537 return server->drop ? server->load_balance_token : nullptr;
544 GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs* pick, grpc_error** error) {
545 // Check if we should drop the call.
546 const char* drop_token = serverlist_->ShouldDrop();
547 if (drop_token != nullptr) {
548 // Update client load reporting stats to indicate the number of
549 // dropped calls. Note that we have to do this here instead of in
550 // the client_load_reporting filter, because we do not create a
551 // subchannel call (and therefore no client_load_reporting filter)
552 // for dropped calls.
553 if (client_stats_ != nullptr) {
554 client_stats_->AddCallDroppedLocked(drop_token);
556 return PICK_COMPLETE;
558 // Forward pick to child policy.
559 PickResult result = child_picker_->Pick(pick, error);
560 // If pick succeeded, add LB token to initial metadata.
561 if (result == PickResult::PICK_COMPLETE &&
562 pick->connected_subchannel != nullptr) {
563 const grpc_arg* arg = grpc_channel_args_find(
564 pick->connected_subchannel->args(), GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN);
565 if (arg == nullptr) {
567 "[grpclb %p picker %p] No LB token for connected subchannel "
569 parent_, this, pick);
572 grpc_mdelem lb_token = {reinterpret_cast<uintptr_t>(arg->value.pointer.p)};
573 GPR_ASSERT(!GRPC_MDISNULL(lb_token));
574 GPR_ASSERT(grpc_metadata_batch_add_tail(
575 pick->initial_metadata, &pick->lb_token_mdelem_storage,
576 GRPC_MDELEM_REF(lb_token)) == GRPC_ERROR_NONE);
577 GrpcLbClientStats* client_stats = static_cast<GrpcLbClientStats*>(
578 grpc_mdelem_get_user_data(lb_token, GrpcLbClientStats::Destroy));
579 if (client_stats != nullptr) {
580 client_stats->AddCallStarted();
590 bool GrpcLb::Helper::CalledByPendingChild() const {
591 GPR_ASSERT(child_ != nullptr);
592 return child_ == parent_->pending_child_policy_.get();
595 bool GrpcLb::Helper::CalledByCurrentChild() const {
596 GPR_ASSERT(child_ != nullptr);
597 return child_ == parent_->child_policy_.get();
600 Subchannel* GrpcLb::Helper::CreateSubchannel(const grpc_channel_args& args) {
601 if (parent_->shutting_down_ ||
602 (!CalledByPendingChild() && !CalledByCurrentChild())) {
605 return parent_->channel_control_helper()->CreateSubchannel(args);
608 grpc_channel* GrpcLb::Helper::CreateChannel(const char* target,
609 const grpc_channel_args& args) {
610 if (parent_->shutting_down_ ||
611 (!CalledByPendingChild() && !CalledByCurrentChild())) {
614 return parent_->channel_control_helper()->CreateChannel(target, args);
617 void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
618 grpc_error* state_error,
619 UniquePtr<SubchannelPicker> picker) {
620 if (parent_->shutting_down_) {
621 GRPC_ERROR_UNREF(state_error);
624 // If this request is from the pending child policy, ignore it until
625 // it reports READY, at which point we swap it into place.
626 if (CalledByPendingChild()) {
627 if (grpc_lb_glb_trace.enabled()) {
629 "[grpclb %p helper %p] pending child policy %p reports state=%s",
630 parent_.get(), this, parent_->pending_child_policy_.get(),
631 grpc_connectivity_state_name(state));
633 if (state != GRPC_CHANNEL_READY) {
634 GRPC_ERROR_UNREF(state_error);
637 grpc_pollset_set_del_pollset_set(
638 parent_->child_policy_->interested_parties(),
639 parent_->interested_parties());
640 MutexLock lock(&parent_->child_policy_mu_);
641 parent_->child_policy_ = std::move(parent_->pending_child_policy_);
642 } else if (!CalledByCurrentChild()) {
643 // This request is from an outdated child, so ignore it.
644 GRPC_ERROR_UNREF(state_error);
647 // Record whether child policy reports READY.
648 parent_->child_policy_ready_ = state == GRPC_CHANNEL_READY;
649 // Enter fallback mode if needed.
650 parent_->MaybeEnterFallbackModeAfterStartup();
651 // There are three cases to consider here:
652 // 1. We're in fallback mode. In this case, we're always going to use
653 // the child policy's result, so we pass its picker through as-is.
654 // 2. The serverlist contains only drop entries. In this case, we
655 // want to use our own picker so that we can return the drops.
656 // 3. Not in fallback mode and serverlist is not all drops (i.e., it
657 // may be empty or contain at least one backend address). There are
659 // a. The child policy is reporting state READY. In this case, we wrap
660 // the child's picker in our own, so that we can handle drops and LB
661 // token metadata for each pick.
662 // b. The child policy is reporting a state other than READY. In this
663 // case, we don't want to use our own picker, because we don't want
664 // to process drops for picks that yield a QUEUE result; this would
665 // result in dropping too many calls, since we will see the
666 // queued picks multiple times, and we'd consider each one a
667 // separate call for the drop calculation.
669 // Cases 1 and 3b: return picker from the child policy as-is.
670 if (parent_->serverlist_ == nullptr ||
671 (!parent_->serverlist_->ContainsAllDropEntries() &&
672 state != GRPC_CHANNEL_READY)) {
673 if (grpc_lb_glb_trace.enabled()) {
675 "[grpclb %p helper %p] state=%s passing child picker %p as-is",
676 parent_.get(), this, grpc_connectivity_state_name(state),
679 parent_->channel_control_helper()->UpdateState(state, state_error,
683 // Cases 2 and 3a: wrap picker from the child in our own picker.
684 if (grpc_lb_glb_trace.enabled()) {
685 gpr_log(GPR_INFO, "[grpclb %p helper %p] state=%s wrapping child picker %p",
686 parent_.get(), this, grpc_connectivity_state_name(state),
689 RefCountedPtr<GrpcLbClientStats> client_stats;
690 if (parent_->lb_calld_ != nullptr &&
691 parent_->lb_calld_->client_stats() != nullptr) {
692 client_stats = parent_->lb_calld_->client_stats()->Ref();
694 parent_->channel_control_helper()->UpdateState(
696 UniquePtr<SubchannelPicker>(
697 New<Picker>(parent_.get(), parent_->serverlist_, std::move(picker),
698 std::move(client_stats))));
701 void GrpcLb::Helper::RequestReresolution() {
702 if (parent_->shutting_down_) return;
703 const LoadBalancingPolicy* latest_child_policy =
704 parent_->pending_child_policy_ != nullptr
705 ? parent_->pending_child_policy_.get()
706 : parent_->child_policy_.get();
707 if (child_ != latest_child_policy) return;
708 if (grpc_lb_glb_trace.enabled()) {
710 "[grpclb %p] Re-resolution requested from %schild policy (%p).",
711 parent_.get(), CalledByPendingChild() ? "pending " : "", child_);
713 // If we are talking to a balancer, we expect to get updated addresses
714 // from the balancer, so we can ignore the re-resolution request from
715 // the child policy. Otherwise, pass the re-resolution request up to the
717 if (parent_->lb_calld_ == nullptr ||
718 !parent_->lb_calld_->seen_initial_response()) {
719 parent_->channel_control_helper()->RequestReresolution();
724 // GrpcLb::BalancerCallState
727 GrpcLb::BalancerCallState::BalancerCallState(
728 RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)
729 : InternallyRefCounted<BalancerCallState>(&grpc_lb_glb_trace),
730 grpclb_policy_(std::move(parent_grpclb_policy)) {
731 GPR_ASSERT(grpclb_policy_ != nullptr);
732 GPR_ASSERT(!grpclb_policy()->shutting_down_);
733 // Init the LB call. Note that the LB call will progress every time there's
734 // activity in grpclb_policy_->interested_parties(), which is comprised of
735 // the polling entities from client_channel.
736 GPR_ASSERT(grpclb_policy()->server_name_ != nullptr);
737 GPR_ASSERT(grpclb_policy()->server_name_[0] != '\0');
738 const grpc_millis deadline =
739 grpclb_policy()->lb_call_timeout_ms_ == 0
740 ? GRPC_MILLIS_INF_FUTURE
741 : ExecCtx::Get()->Now() + grpclb_policy()->lb_call_timeout_ms_;
742 lb_call_ = grpc_channel_create_pollset_set_call(
743 grpclb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
744 grpclb_policy_->interested_parties(),
745 GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
746 nullptr, deadline, nullptr);
747 // Init the LB call request payload.
748 grpc_grpclb_request* request =
749 grpc_grpclb_request_create(grpclb_policy()->server_name_);
750 grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
751 send_message_payload_ =
752 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
753 grpc_slice_unref_internal(request_payload_slice);
754 grpc_grpclb_request_destroy(request);
755 // Init other data associated with the LB call.
756 grpc_metadata_array_init(&lb_initial_metadata_recv_);
757 grpc_metadata_array_init(&lb_trailing_metadata_recv_);
758 GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSentLocked,
759 this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
760 GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_,
761 OnBalancerMessageReceivedLocked, this,
762 grpc_combiner_scheduler(grpclb_policy()->combiner()));
763 GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_,
764 OnBalancerStatusReceivedLocked, this,
765 grpc_combiner_scheduler(grpclb_policy()->combiner()));
768 GrpcLb::BalancerCallState::~BalancerCallState() {
769 GPR_ASSERT(lb_call_ != nullptr);
770 grpc_call_unref(lb_call_);
771 grpc_metadata_array_destroy(&lb_initial_metadata_recv_);
772 grpc_metadata_array_destroy(&lb_trailing_metadata_recv_);
773 grpc_byte_buffer_destroy(send_message_payload_);
774 grpc_byte_buffer_destroy(recv_message_payload_);
775 grpc_slice_unref_internal(lb_call_status_details_);
778 void GrpcLb::BalancerCallState::Orphan() {
779 GPR_ASSERT(lb_call_ != nullptr);
780 // If we are here because grpclb_policy wants to cancel the call,
781 // lb_on_balancer_status_received_ will complete the cancellation and clean
782 // up. Otherwise, we are here because grpclb_policy has to orphan a failed
783 // call, then the following cancellation will be a no-op.
784 grpc_call_cancel(lb_call_, nullptr);
785 if (client_load_report_timer_callback_pending_) {
786 grpc_timer_cancel(&client_load_report_timer_);
788 // Note that the initial ref is hold by lb_on_balancer_status_received_
789 // instead of the caller of this function. So the corresponding unref happens
790 // in lb_on_balancer_status_received_ instead of here.
793 void GrpcLb::BalancerCallState::StartQuery() {
794 GPR_ASSERT(lb_call_ != nullptr);
795 if (grpc_lb_glb_trace.enabled()) {
796 gpr_log(GPR_INFO, "[grpclb %p] lb_calld=%p: Starting LB call %p",
797 grpclb_policy_.get(), this, lb_call_);
800 grpc_call_error call_error;
802 memset(ops, 0, sizeof(ops));
803 // Op: send initial metadata.
805 op->op = GRPC_OP_SEND_INITIAL_METADATA;
806 op->data.send_initial_metadata.count = 0;
807 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
808 GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
809 op->reserved = nullptr;
811 // Op: send request message.
812 GPR_ASSERT(send_message_payload_ != nullptr);
813 op->op = GRPC_OP_SEND_MESSAGE;
814 op->data.send_message.send_message = send_message_payload_;
816 op->reserved = nullptr;
818 // TODO(roth): We currently track this ref manually. Once the
819 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
820 // with the callback.
821 auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent");
823 call_error = grpc_call_start_batch_and_execute(
824 lb_call_, ops, (size_t)(op - ops), &lb_on_initial_request_sent_);
825 GPR_ASSERT(GRPC_CALL_OK == call_error);
826 // Op: recv initial metadata.
828 op->op = GRPC_OP_RECV_INITIAL_METADATA;
829 op->data.recv_initial_metadata.recv_initial_metadata =
830 &lb_initial_metadata_recv_;
832 op->reserved = nullptr;
834 // Op: recv response.
835 op->op = GRPC_OP_RECV_MESSAGE;
836 op->data.recv_message.recv_message = &recv_message_payload_;
838 op->reserved = nullptr;
840 // TODO(roth): We currently track this ref manually. Once the
841 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
842 // with the callback.
843 self = Ref(DEBUG_LOCATION, "on_message_received");
845 call_error = grpc_call_start_batch_and_execute(
846 lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_message_received_);
847 GPR_ASSERT(GRPC_CALL_OK == call_error);
848 // Op: recv server status.
850 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
851 op->data.recv_status_on_client.trailing_metadata =
852 &lb_trailing_metadata_recv_;
853 op->data.recv_status_on_client.status = &lb_call_status_;
854 op->data.recv_status_on_client.status_details = &lb_call_status_details_;
856 op->reserved = nullptr;
858 // This callback signals the end of the LB call, so it relies on the initial
859 // ref instead of a new ref. When it's invoked, it's the initial ref that is
861 call_error = grpc_call_start_batch_and_execute(
862 lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_status_received_);
863 GPR_ASSERT(GRPC_CALL_OK == call_error);
866 void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
867 const grpc_millis next_client_load_report_time =
868 ExecCtx::Get()->Now() + client_stats_report_interval_;
869 GRPC_CLOSURE_INIT(&client_load_report_closure_,
870 MaybeSendClientLoadReportLocked, this,
871 grpc_combiner_scheduler(grpclb_policy()->combiner()));
872 grpc_timer_init(&client_load_report_timer_, next_client_load_report_time,
873 &client_load_report_closure_);
874 client_load_report_timer_callback_pending_ = true;
877 void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked(
878 void* arg, grpc_error* error) {
879 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
880 GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
881 lb_calld->client_load_report_timer_callback_pending_ = false;
882 if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
883 lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
886 // If we've already sent the initial request, then we can go ahead and send
887 // the load report. Otherwise, we need to wait until the initial request has
888 // been sent to send this (see OnInitialRequestSentLocked()).
889 if (lb_calld->send_message_payload_ == nullptr) {
890 lb_calld->SendClientLoadReportLocked();
892 lb_calld->client_load_report_is_due_ = true;
896 bool GrpcLb::BalancerCallState::LoadReportCountersAreZero(
897 grpc_grpclb_request* request) {
898 GrpcLbClientStats::DroppedCallCounts* drop_entries =
899 static_cast<GrpcLbClientStats::DroppedCallCounts*>(
900 request->client_stats.calls_finished_with_drop.arg);
901 return request->client_stats.num_calls_started == 0 &&
902 request->client_stats.num_calls_finished == 0 &&
903 request->client_stats.num_calls_finished_with_client_failed_to_send ==
905 request->client_stats.num_calls_finished_known_received == 0 &&
906 (drop_entries == nullptr || drop_entries->size() == 0);
909 void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
910 // Construct message payload.
911 GPR_ASSERT(send_message_payload_ == nullptr);
912 grpc_grpclb_request* request =
913 grpc_grpclb_load_report_request_create_locked(client_stats_.get());
914 // Skip client load report if the counters were all zero in the last
915 // report and they are still zero in this one.
916 if (LoadReportCountersAreZero(request)) {
917 if (last_client_load_report_counters_were_zero_) {
918 grpc_grpclb_request_destroy(request);
919 ScheduleNextClientLoadReportLocked();
922 last_client_load_report_counters_were_zero_ = true;
924 last_client_load_report_counters_were_zero_ = false;
926 grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
927 send_message_payload_ =
928 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
929 grpc_slice_unref_internal(request_payload_slice);
930 grpc_grpclb_request_destroy(request);
933 memset(&op, 0, sizeof(op));
934 op.op = GRPC_OP_SEND_MESSAGE;
935 op.data.send_message.send_message = send_message_payload_;
936 GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDoneLocked,
937 this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
938 grpc_call_error call_error = grpc_call_start_batch_and_execute(
939 lb_call_, &op, 1, &client_load_report_closure_);
940 if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
942 "[grpclb %p] lb_calld=%p call_error=%d sending client load report",
943 grpclb_policy_.get(), this, call_error);
944 GPR_ASSERT(GRPC_CALL_OK == call_error);
948 void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg,
950 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
951 GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
952 grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
953 lb_calld->send_message_payload_ = nullptr;
954 if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
955 lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
958 lb_calld->ScheduleNextClientLoadReportLocked();
961 void GrpcLb::BalancerCallState::OnInitialRequestSentLocked(void* arg,
963 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
964 grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
965 lb_calld->send_message_payload_ = nullptr;
966 // If we attempted to send a client load report before the initial request was
967 // sent (and this lb_calld is still in use), send the load report now.
968 if (lb_calld->client_load_report_is_due_ &&
969 lb_calld == lb_calld->grpclb_policy()->lb_calld_.get()) {
970 lb_calld->SendClientLoadReportLocked();
971 lb_calld->client_load_report_is_due_ = false;
973 lb_calld->Unref(DEBUG_LOCATION, "on_initial_request_sent");
976 void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
977 void* arg, grpc_error* error) {
978 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
979 GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
980 // Null payload means the LB call was cancelled.
981 if (lb_calld != grpclb_policy->lb_calld_.get() ||
982 lb_calld->recv_message_payload_ == nullptr) {
983 lb_calld->Unref(DEBUG_LOCATION, "on_message_received");
986 grpc_byte_buffer_reader bbr;
987 grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload_);
988 grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
989 grpc_byte_buffer_reader_destroy(&bbr);
990 grpc_byte_buffer_destroy(lb_calld->recv_message_payload_);
991 lb_calld->recv_message_payload_ = nullptr;
992 grpc_grpclb_initial_response* initial_response;
993 grpc_grpclb_serverlist* serverlist;
994 if (!lb_calld->seen_initial_response_ &&
995 (initial_response = grpc_grpclb_initial_response_parse(response_slice)) !=
997 // Have NOT seen initial response, look for initial response.
998 if (initial_response->has_client_stats_report_interval) {
999 lb_calld->client_stats_report_interval_ = GPR_MAX(
1000 GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis(
1001 &initial_response->client_stats_report_interval));
1002 if (grpc_lb_glb_trace.enabled()) {
1004 "[grpclb %p] lb_calld=%p: Received initial LB response "
1005 "message; client load reporting interval = %" PRId64
1007 grpclb_policy, lb_calld,
1008 lb_calld->client_stats_report_interval_);
1010 } else if (grpc_lb_glb_trace.enabled()) {
1012 "[grpclb %p] lb_calld=%p: Received initial LB response message; "
1013 "client load reporting NOT enabled",
1014 grpclb_policy, lb_calld);
1016 grpc_grpclb_initial_response_destroy(initial_response);
1017 lb_calld->seen_initial_response_ = true;
1018 } else if ((serverlist = grpc_grpclb_response_parse_serverlist(
1019 response_slice)) != nullptr) {
1020 // Have seen initial response, look for serverlist.
1021 GPR_ASSERT(lb_calld->lb_call_ != nullptr);
1022 auto serverlist_wrapper = MakeRefCounted<Serverlist>(serverlist);
1023 if (grpc_lb_glb_trace.enabled()) {
1024 UniquePtr<char> serverlist_text = serverlist_wrapper->AsText();
1026 "[grpclb %p] lb_calld=%p: Serverlist with %" PRIuPTR
1027 " servers received:\n%s",
1028 grpclb_policy, lb_calld, serverlist->num_servers,
1029 serverlist_text.get());
1031 lb_calld->seen_serverlist_ = true;
1032 // Start sending client load report only after we start using the
1033 // serverlist returned from the current LB call.
1034 if (lb_calld->client_stats_report_interval_ > 0 &&
1035 lb_calld->client_stats_ == nullptr) {
1036 lb_calld->client_stats_ = MakeRefCounted<GrpcLbClientStats>();
1037 // Ref held by callback.
1038 lb_calld->Ref(DEBUG_LOCATION, "client_load_report").release();
1039 lb_calld->ScheduleNextClientLoadReportLocked();
1041 // Check if the serverlist differs from the previous one.
1042 if (grpclb_policy->serverlist_ != nullptr &&
1043 *grpclb_policy->serverlist_ == *serverlist_wrapper) {
1044 if (grpc_lb_glb_trace.enabled()) {
1046 "[grpclb %p] lb_calld=%p: Incoming server list identical to "
1047 "current, ignoring.",
1048 grpclb_policy, lb_calld);
1050 } else { // New serverlist.
1051 // Dispose of the fallback.
1052 // TODO(roth): Ideally, we should stay in fallback mode until we
1053 // know that we can reach at least one of the backends in the new
1054 // serverlist. Unfortunately, we can't do that, since we need to
1055 // send the new addresses to the child policy in order to determine
1056 // if they are reachable, and if we don't exit fallback mode now,
1057 // CreateOrUpdateChildPolicyLocked() will use the fallback
1058 // addresses instead of the addresses from the new serverlist.
1059 // However, if we can't reach any of the servers in the new
1060 // serverlist, then the child policy will never switch away from
1061 // the fallback addresses, but the grpclb policy will still think
1062 // that we're not in fallback mode, which means that we won't send
1063 // updates to the child policy when the fallback addresses are
1064 // updated by the resolver. This is sub-optimal, but the only way
1065 // to fix it is to maintain a completely separate child policy for
1066 // fallback mode, and that's more work than we want to put into
1067 // the grpclb implementation at this point, since we're deprecating
1068 // it in favor of the xds policy. We will implement this the
1069 // right way in the xds policy instead.
1070 if (grpclb_policy->fallback_mode_) {
1072 "[grpclb %p] Received response from balancer; exiting "
1075 grpclb_policy->fallback_mode_ = false;
1077 if (grpclb_policy->fallback_at_startup_checks_pending_) {
1078 grpclb_policy->fallback_at_startup_checks_pending_ = false;
1079 grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
1080 grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
1082 // Update the serverlist in the GrpcLb instance. This serverlist
1083 // instance will be destroyed either upon the next update or when the
1084 // GrpcLb instance is destroyed.
1085 grpclb_policy->serverlist_ = std::move(serverlist_wrapper);
1086 grpclb_policy->CreateOrUpdateChildPolicyLocked();
1089 // No valid initial response or serverlist found.
1090 char* response_slice_str =
1091 grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX);
1093 "[grpclb %p] lb_calld=%p: Invalid LB response received: '%s'. "
1095 grpclb_policy, lb_calld, response_slice_str);
1096 gpr_free(response_slice_str);
1098 grpc_slice_unref_internal(response_slice);
1099 if (!grpclb_policy->shutting_down_) {
1100 // Keep listening for serverlist updates.
1102 memset(&op, 0, sizeof(op));
1103 op.op = GRPC_OP_RECV_MESSAGE;
1104 op.data.recv_message.recv_message = &lb_calld->recv_message_payload_;
1106 op.reserved = nullptr;
1107 // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
1108 const grpc_call_error call_error = grpc_call_start_batch_and_execute(
1109 lb_calld->lb_call_, &op, 1,
1110 &lb_calld->lb_on_balancer_message_received_);
1111 GPR_ASSERT(GRPC_CALL_OK == call_error);
1113 lb_calld->Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown");
1117 void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
1118 void* arg, grpc_error* error) {
1119 BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1120 GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
1121 GPR_ASSERT(lb_calld->lb_call_ != nullptr);
1122 if (grpc_lb_glb_trace.enabled()) {
1123 char* status_details =
1124 grpc_slice_to_c_string(lb_calld->lb_call_status_details_);
1126 "[grpclb %p] lb_calld=%p: Status from LB server received. "
1127 "Status = %d, details = '%s', (lb_call: %p), error '%s'",
1128 grpclb_policy, lb_calld, lb_calld->lb_call_status_, status_details,
1129 lb_calld->lb_call_, grpc_error_string(error));
1130 gpr_free(status_details);
1132 // If this lb_calld is still in use, this call ended because of a failure so
1133 // we want to retry connecting. Otherwise, we have deliberately ended this
1134 // call and no further action is required.
1135 if (lb_calld == grpclb_policy->lb_calld_.get()) {
1136 // If we did not receive a serverlist and the fallback-at-startup checks
1137 // are pending, go into fallback mode immediately. This short-circuits
1138 // the timeout for the fallback-at-startup case.
1139 if (!lb_calld->seen_serverlist_ &&
1140 grpclb_policy->fallback_at_startup_checks_pending_) {
1142 "[grpclb %p] balancer call finished without receiving "
1143 "serverlist; entering fallback mode",
1145 grpclb_policy->fallback_at_startup_checks_pending_ = false;
1146 grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
1147 grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
1148 grpclb_policy->fallback_mode_ = true;
1149 grpclb_policy->CreateOrUpdateChildPolicyLocked();
1151 // This handles the fallback-after-startup case.
1152 grpclb_policy->MaybeEnterFallbackModeAfterStartup();
1154 grpclb_policy->lb_calld_.reset();
1155 GPR_ASSERT(!grpclb_policy->shutting_down_);
1156 grpclb_policy->channel_control_helper()->RequestReresolution();
1157 if (lb_calld->seen_initial_response_) {
1158 // If we lose connection to the LB server, reset the backoff and restart
1159 // the LB call immediately.
1160 grpclb_policy->lb_call_backoff_.Reset();
1161 grpclb_policy->StartBalancerCallLocked();
1163 // If this LB call fails establishing any connection to the LB server,
1165 grpclb_policy->StartBalancerCallRetryTimerLocked();
1168 lb_calld->Unref(DEBUG_LOCATION, "lb_call_ended");
1172 // helper code for creating balancer channel
1175 ServerAddressList ExtractBalancerAddresses(const ServerAddressList& addresses) {
1176 ServerAddressList balancer_addresses;
1177 for (size_t i = 0; i < addresses.size(); ++i) {
1178 if (addresses[i].IsBalancer()) {
1179 // Strip out the is_balancer channel arg, since we don't want to
1180 // recursively use the grpclb policy in the channel used to talk to
1181 // the balancers. Note that we do NOT strip out the balancer_name
1182 // channel arg, since we need that to set the authority correctly
1183 // to talk to the balancers.
1184 static const char* args_to_remove[] = {
1185 GRPC_ARG_ADDRESS_IS_BALANCER,
1187 balancer_addresses.emplace_back(
1188 addresses[i].address(),
1189 grpc_channel_args_copy_and_remove(addresses[i].args(), args_to_remove,
1190 GPR_ARRAY_SIZE(args_to_remove)));
1193 return balancer_addresses;
1196 /* Returns the channel args for the LB channel, used to create a bidirectional
1197 * stream for the reception of load balancing updates.
1200 * - \a addresses: corresponding to the balancers.
1201 * - \a response_generator: in order to propagate updates from the resolver
1202 * above the grpclb policy.
1203 * - \a args: other args inherited from the grpclb policy. */
1204 grpc_channel_args* BuildBalancerChannelArgs(
1205 const ServerAddressList& addresses,
1206 FakeResolverResponseGenerator* response_generator,
1207 const grpc_channel_args* args) {
1208 // Channel args to remove.
1209 static const char* args_to_remove[] = {
1210 // LB policy name, since we want to use the default (pick_first) in
1212 GRPC_ARG_LB_POLICY_NAME,
1213 // Strip out the service config, since we don't want the LB policy
1214 // config specified for the parent channel to affect the LB channel.
1215 GRPC_ARG_SERVICE_CONFIG,
1216 // The channel arg for the server URI, since that will be different for
1217 // the LB channel than for the parent channel. The client channel
1218 // factory will re-add this arg with the right value.
1219 GRPC_ARG_SERVER_URI,
1220 // The fake resolver response generator, because we are replacing it
1221 // with the one from the grpclb policy, used to propagate updates to
1223 GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
1224 // The LB channel should use the authority indicated by the target
1225 // authority table (see \a grpc_lb_policy_grpclb_modify_lb_channel_args),
1226 // as opposed to the authority from the parent channel.
1227 GRPC_ARG_DEFAULT_AUTHORITY,
1228 // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should be
1229 // treated as a stand-alone channel and not inherit this argument from the
1230 // args of the parent channel.
1231 GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
1233 // Channel args to add.
1234 const grpc_arg args_to_add[] = {
1235 // The fake resolver response generator, which we use to inject
1236 // address updates into the LB channel.
1237 grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
1238 response_generator),
1239 // A channel arg indicating the target is a grpclb load balancer.
1240 grpc_channel_arg_integer_create(
1241 const_cast<char*>(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER), 1),
1242 // A channel arg indicating this is an internal channels, aka it is
1243 // owned by components in Core, not by the user application.
1244 grpc_channel_arg_integer_create(
1245 const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL), 1),
1247 // Construct channel args.
1248 grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
1249 args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add,
1250 GPR_ARRAY_SIZE(args_to_add));
1251 // Make any necessary modifications for security.
1252 return grpc_lb_policy_grpclb_modify_lb_channel_args(addresses, new_args);
1259 GrpcLb::GrpcLb(Args args)
1260 : LoadBalancingPolicy(std::move(args)),
1261 response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
1264 .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS *
1266 .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
1267 .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
1268 .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS *
1271 GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this,
1272 grpc_combiner_scheduler(combiner()));
1273 GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
1274 &GrpcLb::OnBalancerChannelConnectivityChangedLocked, this,
1275 grpc_combiner_scheduler(args.combiner));
1276 gpr_mu_init(&child_policy_mu_);
1277 // Record server name.
1278 const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
1279 const char* server_uri = grpc_channel_arg_get_string(arg);
1280 GPR_ASSERT(server_uri != nullptr);
1281 grpc_uri* uri = grpc_uri_parse(server_uri, true);
1282 GPR_ASSERT(uri->path[0] != '\0');
1283 server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
1284 if (grpc_lb_glb_trace.enabled()) {
1286 "[grpclb %p] Will use '%s' as the server name for LB request.",
1287 this, server_name_);
1289 grpc_uri_destroy(uri);
1290 // Record LB call timeout.
1291 arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
1292 lb_call_timeout_ms_ = grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
1293 // Record fallback-at-startup timeout.
1294 arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
1295 fallback_at_startup_timeout_ = grpc_channel_arg_get_integer(
1296 arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX});
1300 gpr_free((void*)server_name_);
1301 grpc_channel_args_destroy(args_);
1302 gpr_mu_destroy(&child_policy_mu_);
1305 void GrpcLb::ShutdownLocked() {
1306 shutting_down_ = true;
1308 if (retry_timer_callback_pending_) {
1309 grpc_timer_cancel(&lb_call_retry_timer_);
1311 if (fallback_at_startup_checks_pending_) {
1312 grpc_timer_cancel(&lb_fallback_timer_);
1313 CancelBalancerChannelConnectivityWatchLocked();
1315 if (child_policy_ != nullptr) {
1316 grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
1317 interested_parties());
1319 if (pending_child_policy_ != nullptr) {
1320 grpc_pollset_set_del_pollset_set(
1321 pending_child_policy_->interested_parties(), interested_parties());
1324 MutexLock lock(&child_policy_mu_);
1325 child_policy_.reset();
1326 pending_child_policy_.reset();
1328 // We destroy the LB channel here instead of in our destructor because
1329 // destroying the channel triggers a last callback to
1330 // OnBalancerChannelConnectivityChangedLocked(), and we need to be
1331 // alive when that callback is invoked.
1332 if (lb_channel_ != nullptr) {
1333 grpc_channel_destroy(lb_channel_);
1334 lb_channel_ = nullptr;
1335 gpr_atm_no_barrier_store(&lb_channel_uuid_, 0);
1343 void GrpcLb::ResetBackoffLocked() {
1344 if (lb_channel_ != nullptr) {
1345 grpc_channel_reset_connect_backoff(lb_channel_);
1347 if (child_policy_ != nullptr) {
1348 child_policy_->ResetBackoffLocked();
1350 if (pending_child_policy_ != nullptr) {
1351 pending_child_policy_->ResetBackoffLocked();
1355 void GrpcLb::FillChildRefsForChannelz(
1356 channelz::ChildRefsList* child_subchannels,
1357 channelz::ChildRefsList* child_channels) {
1359 // Delegate to the child policy to fill the children subchannels.
1360 // This must be done holding child_policy_mu_, since this method
1361 // does not run in the combiner.
1362 MutexLock lock(&child_policy_mu_);
1363 if (child_policy_ != nullptr) {
1364 child_policy_->FillChildRefsForChannelz(child_subchannels,
1367 if (pending_child_policy_ != nullptr) {
1368 pending_child_policy_->FillChildRefsForChannelz(child_subchannels,
1372 gpr_atm uuid = gpr_atm_no_barrier_load(&lb_channel_uuid_);
1374 child_channels->push_back(uuid);
1378 void GrpcLb::UpdateLocked(UpdateArgs args) {
1379 const bool is_initial_update = lb_channel_ == nullptr;
1380 ParseLbConfig(args.config.get());
1381 ProcessAddressesAndChannelArgsLocked(args.addresses, *args.args);
1382 // Update the existing child policy.
1383 if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked();
1384 // If this is the initial update, start the fallback-at-startup checks
1385 // and the balancer call.
1386 if (is_initial_update) {
1387 fallback_at_startup_checks_pending_ = true;
1389 grpc_millis deadline = ExecCtx::Get()->Now() + fallback_at_startup_timeout_;
1390 Ref(DEBUG_LOCATION, "on_fallback_timer").release(); // Ref for callback
1391 grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
1392 // Start watching the channel's connectivity state. If the channel
1393 // goes into state TRANSIENT_FAILURE before the timer fires, we go into
1394 // fallback mode even if the fallback timeout has not elapsed.
1395 grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
1396 grpc_channel_get_channel_stack(lb_channel_));
1397 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1398 // Ref held by callback.
1399 Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity").release();
1400 grpc_client_channel_watch_connectivity_state(
1401 client_channel_elem,
1402 grpc_polling_entity_create_from_pollset_set(interested_parties()),
1403 &lb_channel_connectivity_, &lb_channel_on_connectivity_changed_,
1405 // Start balancer call.
1406 StartBalancerCallLocked();
1411 // helpers for UpdateLocked()
1414 // Returns the backend addresses extracted from the given addresses.
1415 ServerAddressList ExtractBackendAddresses(const ServerAddressList& addresses) {
1416 void* lb_token = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
1417 grpc_arg arg = grpc_channel_arg_pointer_create(
1418 const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token,
1419 &lb_token_arg_vtable);
1420 ServerAddressList backend_addresses;
1421 for (size_t i = 0; i < addresses.size(); ++i) {
1422 if (!addresses[i].IsBalancer()) {
1423 backend_addresses.emplace_back(
1424 addresses[i].address(),
1425 grpc_channel_args_copy_and_add(addresses[i].args(), &arg, 1));
1428 return backend_addresses;
1431 void GrpcLb::ProcessAddressesAndChannelArgsLocked(
1432 const ServerAddressList& addresses, const grpc_channel_args& args) {
1433 // Update fallback address list.
1434 fallback_backend_addresses_ = ExtractBackendAddresses(addresses);
1435 // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
1436 // since we use this to trigger the client_load_reporting filter.
1437 static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
1438 grpc_arg new_arg = grpc_channel_arg_string_create(
1439 (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"grpclb");
1440 grpc_channel_args_destroy(args_);
1441 args_ = grpc_channel_args_copy_and_add_and_remove(
1442 &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
1443 // Construct args for balancer channel.
1444 ServerAddressList balancer_addresses = ExtractBalancerAddresses(addresses);
1445 grpc_channel_args* lb_channel_args = BuildBalancerChannelArgs(
1446 balancer_addresses, response_generator_.get(), &args);
1447 // Create balancer channel if needed.
1448 if (lb_channel_ == nullptr) {
1450 gpr_asprintf(&uri_str, "fake:///%s", server_name_);
1452 channel_control_helper()->CreateChannel(uri_str, *lb_channel_args);
1453 GPR_ASSERT(lb_channel_ != nullptr);
1454 grpc_core::channelz::ChannelNode* channel_node =
1455 grpc_channel_get_channelz_node(lb_channel_);
1456 if (channel_node != nullptr) {
1457 gpr_atm_no_barrier_store(&lb_channel_uuid_, channel_node->uuid());
1461 // Propagate updates to the LB channel (pick_first) through the fake
1463 Resolver::Result result;
1464 result.addresses = std::move(balancer_addresses);
1465 result.args = lb_channel_args;
1466 response_generator_->SetResponse(std::move(result));
1469 void GrpcLb::ParseLbConfig(Config* grpclb_config) {
1470 const grpc_json* child_policy = nullptr;
1471 if (grpclb_config != nullptr) {
1472 const grpc_json* grpclb_config_json = grpclb_config->config();
1473 for (const grpc_json* field = grpclb_config_json; field != nullptr;
1474 field = field->next) {
1475 if (field->key == nullptr) return;
1476 if (strcmp(field->key, "childPolicy") == 0) {
1477 if (child_policy != nullptr) return; // Duplicate.
1478 child_policy = ParseLoadBalancingConfig(field);
1482 if (child_policy != nullptr) {
1483 child_policy_config_ =
1484 MakeRefCounted<Config>(child_policy, grpclb_config->service_config());
1486 child_policy_config_.reset();
1490 void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
1491 grpc_error* error) {
1492 GrpcLb* self = static_cast<GrpcLb*>(arg);
1493 if (!self->shutting_down_ && self->fallback_at_startup_checks_pending_) {
1494 if (self->lb_channel_connectivity_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
1495 // Not in TRANSIENT_FAILURE. Renew connectivity watch.
1496 grpc_channel_element* client_channel_elem =
1497 grpc_channel_stack_last_element(
1498 grpc_channel_get_channel_stack(self->lb_channel_));
1499 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1500 grpc_client_channel_watch_connectivity_state(
1501 client_channel_elem,
1502 grpc_polling_entity_create_from_pollset_set(
1503 self->interested_parties()),
1504 &self->lb_channel_connectivity_,
1505 &self->lb_channel_on_connectivity_changed_, nullptr);
1506 return; // Early out so we don't drop the ref below.
1508 // In TRANSIENT_FAILURE. Cancel the fallback timer and go into
1509 // fallback mode immediately.
1511 "[grpclb %p] balancer channel in state TRANSIENT_FAILURE; "
1512 "entering fallback mode",
1514 self->fallback_at_startup_checks_pending_ = false;
1515 grpc_timer_cancel(&self->lb_fallback_timer_);
1516 self->fallback_mode_ = true;
1517 self->CreateOrUpdateChildPolicyLocked();
1519 // Done watching connectivity state, so drop ref.
1520 self->Unref(DEBUG_LOCATION, "watch_lb_channel_connectivity");
1523 void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() {
1524 grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
1525 grpc_channel_get_channel_stack(lb_channel_));
1526 GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1527 grpc_client_channel_watch_connectivity_state(
1528 client_channel_elem,
1529 grpc_polling_entity_create_from_pollset_set(interested_parties()),
1530 nullptr, &lb_channel_on_connectivity_changed_, nullptr);
1534 // code for balancer channel and call
1537 void GrpcLb::StartBalancerCallLocked() {
1538 GPR_ASSERT(lb_channel_ != nullptr);
1539 if (shutting_down_) return;
1540 // Init the LB call data.
1541 GPR_ASSERT(lb_calld_ == nullptr);
1542 lb_calld_ = MakeOrphanable<BalancerCallState>(Ref());
1543 if (grpc_lb_glb_trace.enabled()) {
1545 "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p)",
1546 this, lb_channel_, lb_calld_.get());
1548 lb_calld_->StartQuery();
1551 void GrpcLb::StartBalancerCallRetryTimerLocked() {
1552 grpc_millis next_try = lb_call_backoff_.NextAttemptTime();
1553 if (grpc_lb_glb_trace.enabled()) {
1554 gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this);
1555 grpc_millis timeout = next_try - ExecCtx::Get()->Now();
1557 gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRId64 "ms.",
1560 gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active immediately.",
1564 // TODO(roth): We currently track this ref manually. Once the
1565 // ClosureRef API is ready, we should pass the RefCountedPtr<> along
1566 // with the callback.
1567 auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
1569 GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimerLocked,
1570 this, grpc_combiner_scheduler(combiner()));
1571 retry_timer_callback_pending_ = true;
1572 grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_);
1575 void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) {
1576 GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1577 grpclb_policy->retry_timer_callback_pending_ = false;
1578 if (!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE &&
1579 grpclb_policy->lb_calld_ == nullptr) {
1580 if (grpc_lb_glb_trace.enabled()) {
1581 gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server",
1584 grpclb_policy->StartBalancerCallLocked();
1586 grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
1590 // code for handling fallback mode
1593 void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
1594 // Enter fallback mode if all of the following are true:
1595 // - We are not currently in fallback mode.
1596 // - We are not currently waiting for the initial fallback timeout.
1597 // - We are not currently in contact with the balancer.
1598 // - The child policy is not in state READY.
1599 if (!fallback_mode_ && !fallback_at_startup_checks_pending_ &&
1600 (lb_calld_ == nullptr || !lb_calld_->seen_serverlist()) &&
1601 !child_policy_ready_) {
1603 "[grpclb %p] lost contact with balancer and backends from "
1604 "most recent serverlist; entering fallback mode",
1606 fallback_mode_ = true;
1607 CreateOrUpdateChildPolicyLocked();
1611 void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
1612 GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1613 // If we receive a serverlist after the timer fires but before this callback
1614 // actually runs, don't fall back.
1615 if (grpclb_policy->fallback_at_startup_checks_pending_ &&
1616 !grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE) {
1618 "[grpclb %p] No response from balancer after fallback timeout; "
1619 "entering fallback mode",
1621 grpclb_policy->fallback_at_startup_checks_pending_ = false;
1622 grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
1623 grpclb_policy->fallback_mode_ = true;
1624 grpclb_policy->CreateOrUpdateChildPolicyLocked();
1626 grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
1630 // code for interacting with the child policy
1633 grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked(
1634 bool is_backend_from_grpclb_load_balancer) {
1635 grpc_arg args_to_add[2] = {
1636 // A channel arg indicating if the target is a backend inferred from a
1637 // grpclb load balancer.
1638 grpc_channel_arg_integer_create(
1640 GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER),
1641 is_backend_from_grpclb_load_balancer),
1643 size_t num_args_to_add = 1;
1644 if (is_backend_from_grpclb_load_balancer) {
1645 args_to_add[num_args_to_add++] = grpc_channel_arg_integer_create(
1646 const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1);
1648 return grpc_channel_args_copy_and_add(args_, args_to_add, num_args_to_add);
1651 OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
1652 const char* name, const grpc_channel_args* args) {
1653 Helper* helper = New<Helper>(Ref());
1654 LoadBalancingPolicy::Args lb_policy_args;
1655 lb_policy_args.combiner = combiner();
1656 lb_policy_args.args = args;
1657 lb_policy_args.channel_control_helper =
1658 UniquePtr<ChannelControlHelper>(helper);
1659 OrphanablePtr<LoadBalancingPolicy> lb_policy =
1660 LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
1661 name, std::move(lb_policy_args));
1662 if (GPR_UNLIKELY(lb_policy == nullptr)) {
1663 gpr_log(GPR_ERROR, "[grpclb %p] Failure creating child policy %s", this,
1667 helper->set_child(lb_policy.get());
1668 if (grpc_lb_glb_trace.enabled()) {
1669 gpr_log(GPR_INFO, "[grpclb %p] Created new child policy %s (%p)", this,
1670 name, lb_policy.get());
1672 // Add the gRPC LB's interested_parties pollset_set to that of the newly
1673 // created child policy. This will make the child policy progress upon
1674 // activity on gRPC LB, which in turn is tied to the application's call.
1675 grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
1676 interested_parties());
1680 void GrpcLb::CreateOrUpdateChildPolicyLocked() {
1681 if (shutting_down_) return;
1682 // Construct update args.
1683 UpdateArgs update_args;
1684 bool is_backend_from_grpclb_load_balancer = false;
1685 if (fallback_mode_) {
1686 // If CreateOrUpdateChildPolicyLocked() is invoked when we haven't
1687 // received any serverlist from the balancer, we use the fallback backends
1688 // returned by the resolver. Note that the fallback backend list may be
1689 // empty, in which case the new round_robin policy will keep the requested
1691 update_args.addresses = fallback_backend_addresses_;
1693 update_args.addresses = serverlist_->GetServerAddressList(
1694 lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats());
1695 is_backend_from_grpclb_load_balancer = true;
1698 CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer);
1699 GPR_ASSERT(update_args.args != nullptr);
1700 update_args.config = child_policy_config_;
1701 // If the child policy name changes, we need to create a new child
1702 // policy. When this happens, we leave child_policy_ as-is and store
1703 // the new child policy in pending_child_policy_. Once the new child
1704 // policy transitions into state READY, we swap it into child_policy_,
1705 // replacing the original child policy. So pending_child_policy_ is
1706 // non-null only between when we apply an update that changes the child
1707 // policy name and when the new child reports state READY.
1709 // Updates can arrive at any point during this transition. We always
1710 // apply updates relative to the most recently created child policy,
1711 // even if the most recent one is still in pending_child_policy_. This
1712 // is true both when applying the updates to an existing child policy
1713 // and when determining whether we need to create a new policy.
1715 // As a result of this, there are several cases to consider here:
1717 // 1. We have no existing child policy (i.e., we have started up but
1718 // have not yet received a serverlist from the balancer or gone
1719 // into fallback mode; in this case, both child_policy_ and
1720 // pending_child_policy_ are null). In this case, we create a
1721 // new child policy and store it in child_policy_.
1723 // 2. We have an existing child policy and have no pending child policy
1724 // from a previous update (i.e., either there has not been a
1725 // previous update that changed the policy name, or we have already
1726 // finished swapping in the new policy; in this case, child_policy_
1727 // is non-null but pending_child_policy_ is null). In this case:
1728 // a. If child_policy_->name() equals child_policy_name, then we
1729 // update the existing child policy.
1730 // b. If child_policy_->name() does not equal child_policy_name,
1731 // we create a new policy. The policy will be stored in
1732 // pending_child_policy_ and will later be swapped into
1733 // child_policy_ by the helper when the new child transitions
1734 // into state READY.
1736 // 3. We have an existing child policy and have a pending child policy
1737 // from a previous update (i.e., a previous update set
1738 // pending_child_policy_ as per case 2b above and that policy has
1739 // not yet transitioned into state READY and been swapped into
1740 // child_policy_; in this case, both child_policy_ and
1741 // pending_child_policy_ are non-null). In this case:
1742 // a. If pending_child_policy_->name() equals child_policy_name,
1743 // then we update the existing pending child policy.
1744 // b. If pending_child_policy->name() does not equal
1745 // child_policy_name, then we create a new policy. The new
1746 // policy is stored in pending_child_policy_ (replacing the one
1747 // that was there before, which will be immediately shut down)
1748 // and will later be swapped into child_policy_ by the helper
1749 // when the new child transitions into state READY.
1750 const char* child_policy_name = child_policy_config_ == nullptr
1752 : child_policy_config_->name();
1753 const bool create_policy =
1755 child_policy_ == nullptr ||
1757 (pending_child_policy_ == nullptr &&
1758 strcmp(child_policy_->name(), child_policy_name) != 0) ||
1760 (pending_child_policy_ != nullptr &&
1761 strcmp(pending_child_policy_->name(), child_policy_name) != 0);
1762 LoadBalancingPolicy* policy_to_update = nullptr;
1763 if (create_policy) {
1764 // Cases 1, 2b, and 3b: create a new child policy.
1765 // If child_policy_ is null, we set it (case 1), else we set
1766 // pending_child_policy_ (cases 2b and 3b).
1767 if (grpc_lb_glb_trace.enabled()) {
1768 gpr_log(GPR_INFO, "[grpclb %p] Creating new %schild policy %s", this,
1769 child_policy_ == nullptr ? "" : "pending ", child_policy_name);
1772 CreateChildPolicyLocked(child_policy_name, update_args.args);
1773 // Swap the policy into place.
1775 child_policy_ == nullptr ? child_policy_ : pending_child_policy_;
1777 MutexLock lock(&child_policy_mu_);
1778 lb_policy = std::move(new_policy);
1780 policy_to_update = lb_policy.get();
1782 // Cases 2a and 3a: update an existing policy.
1783 // If we have a pending child policy, send the update to the pending
1784 // policy (case 3a), else send it to the current policy (case 2a).
1785 policy_to_update = pending_child_policy_ != nullptr
1786 ? pending_child_policy_.get()
1787 : child_policy_.get();
1789 GPR_ASSERT(policy_to_update != nullptr);
1790 // Update the policy.
1791 if (grpc_lb_glb_trace.enabled()) {
1792 gpr_log(GPR_INFO, "[grpclb %p] Updating %schild policy %p", this,
1793 policy_to_update == pending_child_policy_.get() ? "pending " : "",
1796 policy_to_update->UpdateLocked(std::move(update_args));
1803 class GrpcLbFactory : public LoadBalancingPolicyFactory {
1805 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1806 LoadBalancingPolicy::Args args) const override {
1807 return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(std::move(args)));
1810 const char* name() const override { return kGrpclb; }
1815 } // namespace grpc_core
1818 // Plugin registration
1823 // Only add client_load_reporting filter if the grpclb LB policy is used.
1824 bool maybe_add_client_load_reporting_filter(grpc_channel_stack_builder* builder,
1826 const grpc_channel_args* args =
1827 grpc_channel_stack_builder_get_channel_arguments(builder);
1828 const grpc_arg* channel_arg =
1829 grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME);
1830 if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_STRING &&
1831 strcmp(channel_arg->value.string, "grpclb") == 0) {
1832 return grpc_channel_stack_builder_append_filter(
1833 builder, (const grpc_channel_filter*)arg, nullptr, nullptr);
1840 void grpc_lb_policy_grpclb_init() {
1841 grpc_core::LoadBalancingPolicyRegistry::Builder::
1842 RegisterLoadBalancingPolicyFactory(
1843 grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
1844 grpc_core::New<grpc_core::GrpcLbFactory>()));
1845 grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
1846 GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
1847 maybe_add_client_load_reporting_filter,
1848 (void*)&grpc_client_load_reporting_filter);
1851 void grpc_lb_policy_grpclb_shutdown() {}