ed6e8de3f2134cdca4c86f70e1d0405f8535fea9
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / lb_policy / grpclb / grpclb.cc
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 /// Implementation of the gRPC LB policy.
20 ///
21 /// This policy takes as input a list of resolved addresses, which must
22 /// include at least one balancer address.
23 ///
24 /// An internal channel (\a lb_channel_) is created for the addresses
25 /// from that are balancers.  This channel behaves just like a regular
26 /// channel that uses pick_first to select from the list of balancer
27 /// addresses.
28 ///
29 /// When we get our initial update, we instantiate the internal *streaming*
30 /// call to the LB server (whichever address pick_first chose).  The call
31 /// will be complete when either the balancer sends status or when we cancel
32 /// the call (e.g., because we are shutting down).  In needed, we retry the
33 /// call.  If we received at least one valid message from the server, a new
34 /// call attempt will be made immediately; otherwise, we apply back-off
35 /// delays between attempts.
36 ///
37 /// We maintain an internal round_robin policy instance for distributing
38 /// requests across backends.  Whenever we receive a new serverlist from
39 /// the balancer, we update the round_robin policy with the new list of
40 /// addresses.  If we cannot communicate with the balancer on startup,
41 /// however, we may enter fallback mode, in which case we will populate
42 /// the child policy's addresses from the backend addresses returned by the
43 /// resolver.
44 ///
45 /// Once a child policy instance is in place (and getting updated as described),
46 /// calls for a pick, a ping, or a cancellation will be serviced right
47 /// away by forwarding them to the child policy instance.  Any time there's no
48 /// child policy available (i.e., right after the creation of the gRPCLB
49 /// policy), pick requests are queued.
50 ///
51 /// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
52 /// high level design and details.
53
54 // With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
55 // using that endpoint. Because of various transitive includes in uv.h,
56 // including windows.h on Windows, uv.h must be included before other system
57 // headers. Therefore, sockaddr.h must always be included first.
58 #include <grpc/support/port_platform.h>
59
60 #include "src/core/lib/iomgr/sockaddr.h"
61 #include "src/core/lib/iomgr/socket_utils.h"
62
63 #include <inttypes.h>
64 #include <limits.h>
65 #include <string.h>
66
67 #include <grpc/byte_buffer_reader.h>
68 #include <grpc/grpc.h>
69 #include <grpc/support/alloc.h>
70 #include <grpc/support/string_util.h>
71 #include <grpc/support/time.h>
72
73 #include "src/core/ext/filters/client_channel/client_channel.h"
74 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
75 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
76 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
77 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
78 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
79 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
80 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
81 #include "src/core/ext/filters/client_channel/parse_address.h"
82 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
83 #include "src/core/ext/filters/client_channel/server_address.h"
84 #include "src/core/lib/backoff/backoff.h"
85 #include "src/core/lib/channel/channel_args.h"
86 #include "src/core/lib/channel/channel_stack.h"
87 #include "src/core/lib/gpr/host_port.h"
88 #include "src/core/lib/gpr/string.h"
89 #include "src/core/lib/gprpp/manual_constructor.h"
90 #include "src/core/lib/gprpp/memory.h"
91 #include "src/core/lib/gprpp/orphanable.h"
92 #include "src/core/lib/gprpp/ref_counted_ptr.h"
93 #include "src/core/lib/iomgr/combiner.h"
94 #include "src/core/lib/iomgr/sockaddr.h"
95 #include "src/core/lib/iomgr/sockaddr_utils.h"
96 #include "src/core/lib/iomgr/timer.h"
97 #include "src/core/lib/slice/slice_hash_table.h"
98 #include "src/core/lib/slice/slice_internal.h"
99 #include "src/core/lib/slice/slice_string_helpers.h"
100 #include "src/core/lib/surface/call.h"
101 #include "src/core/lib/surface/channel.h"
102 #include "src/core/lib/surface/channel_init.h"
103 #include "src/core/lib/transport/static_metadata.h"
104
105 #define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
106 #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
107 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
108 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2
109 #define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
110
111 #define GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN "grpc.grpclb_address_lb_token"
112
113 namespace grpc_core {
114
115 TraceFlag grpc_lb_glb_trace(false, "glb");
116
117 namespace {
118
119 constexpr char kGrpclb[] = "grpclb";
120
121 class ParsedGrpcLbConfig : public ParsedLoadBalancingConfig {
122  public:
123   explicit ParsedGrpcLbConfig(
124       RefCountedPtr<ParsedLoadBalancingConfig> child_policy)
125       : child_policy_(std::move(child_policy)) {}
126   const char* name() const override { return kGrpclb; }
127
128   RefCountedPtr<ParsedLoadBalancingConfig> child_policy() const {
129     return child_policy_;
130   }
131
132  private:
133   RefCountedPtr<ParsedLoadBalancingConfig> child_policy_;
134 };
135
136 class GrpcLb : public LoadBalancingPolicy {
137  public:
138   explicit GrpcLb(Args args);
139
140   const char* name() const override { return kGrpclb; }
141
142   void UpdateLocked(UpdateArgs args) override;
143   void ResetBackoffLocked() override;
144   void FillChildRefsForChannelz(
145       channelz::ChildRefsList* child_subchannels,
146       channelz::ChildRefsList* child_channels) override;
147
148  private:
149   /// Contains a call to the LB server and all the data related to the call.
150   class BalancerCallState : public InternallyRefCounted<BalancerCallState> {
151    public:
152     explicit BalancerCallState(
153         RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy);
154
155     // It's the caller's responsibility to ensure that Orphan() is called from
156     // inside the combiner.
157     void Orphan() override;
158
159     void StartQuery();
160
161     GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
162
163     bool seen_initial_response() const { return seen_initial_response_; }
164     bool seen_serverlist() const { return seen_serverlist_; }
165
166    private:
167     // So Delete() can access our private dtor.
168     template <typename T>
169     friend void grpc_core::Delete(T*);
170
171     ~BalancerCallState();
172
173     GrpcLb* grpclb_policy() const {
174       return static_cast<GrpcLb*>(grpclb_policy_.get());
175     }
176
177     void ScheduleNextClientLoadReportLocked();
178     void SendClientLoadReportLocked();
179
180     static bool LoadReportCountersAreZero(grpc_grpclb_request* request);
181
182     static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error);
183     static void ClientLoadReportDoneLocked(void* arg, grpc_error* error);
184     static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
185     static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error);
186     static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error);
187
188     // The owning LB policy.
189     RefCountedPtr<LoadBalancingPolicy> grpclb_policy_;
190
191     // The streaming call to the LB server. Always non-NULL.
192     grpc_call* lb_call_ = nullptr;
193
194     // recv_initial_metadata
195     grpc_metadata_array lb_initial_metadata_recv_;
196
197     // send_message
198     grpc_byte_buffer* send_message_payload_ = nullptr;
199     grpc_closure lb_on_initial_request_sent_;
200
201     // recv_message
202     grpc_byte_buffer* recv_message_payload_ = nullptr;
203     grpc_closure lb_on_balancer_message_received_;
204     bool seen_initial_response_ = false;
205     bool seen_serverlist_ = false;
206
207     // recv_trailing_metadata
208     grpc_closure lb_on_balancer_status_received_;
209     grpc_metadata_array lb_trailing_metadata_recv_;
210     grpc_status_code lb_call_status_;
211     grpc_slice lb_call_status_details_;
212
213     // The stats for client-side load reporting associated with this LB call.
214     // Created after the first serverlist is received.
215     RefCountedPtr<GrpcLbClientStats> client_stats_;
216     grpc_millis client_stats_report_interval_ = 0;
217     grpc_timer client_load_report_timer_;
218     bool client_load_report_timer_callback_pending_ = false;
219     bool last_client_load_report_counters_were_zero_ = false;
220     bool client_load_report_is_due_ = false;
221     // The closure used for either the load report timer or the callback for
222     // completion of sending the load report.
223     grpc_closure client_load_report_closure_;
224   };
225
226   class Serverlist : public RefCounted<Serverlist> {
227    public:
228     // Takes ownership of serverlist.
229     explicit Serverlist(grpc_grpclb_serverlist* serverlist)
230         : serverlist_(serverlist) {}
231
232     ~Serverlist() { grpc_grpclb_destroy_serverlist(serverlist_); }
233
234     bool operator==(const Serverlist& other) const;
235
236     const grpc_grpclb_serverlist* serverlist() const { return serverlist_; }
237
238     // Returns a text representation suitable for logging.
239     UniquePtr<char> AsText() const;
240
241     // Extracts all non-drop entries into a ServerAddressList.
242     ServerAddressList GetServerAddressList(
243         GrpcLbClientStats* client_stats) const;
244
245     // Returns true if the serverlist contains at least one drop entry and
246     // no backend address entries.
247     bool ContainsAllDropEntries() const;
248
249     // Returns the LB token to use for a drop, or null if the call
250     // should not be dropped.
251     //
252     // Note: This is called from the picker, so it will be invoked in
253     // the channel's data plane combiner, NOT the control plane
254     // combiner.  It should not be accessed by any other part of the LB
255     // policy.
256     const char* ShouldDrop();
257
258    private:
259     grpc_grpclb_serverlist* serverlist_;
260
261     // Guarded by the channel's data plane combiner, NOT the control
262     // plane combiner.  It should not be accessed by anything but the
263     // picker via the ShouldDrop() method.
264     size_t drop_index_ = 0;
265   };
266
267   class Picker : public SubchannelPicker {
268    public:
269     Picker(GrpcLb* parent, RefCountedPtr<Serverlist> serverlist,
270            UniquePtr<SubchannelPicker> child_picker,
271            RefCountedPtr<GrpcLbClientStats> client_stats)
272         : parent_(parent),
273           serverlist_(std::move(serverlist)),
274           child_picker_(std::move(child_picker)),
275           client_stats_(std::move(client_stats)) {}
276
277     PickResult Pick(PickArgs* pick, grpc_error** error) override;
278
279    private:
280     // Storing the address for logging, but not holding a ref.
281     // DO NOT DEFERENCE!
282     GrpcLb* parent_;
283
284     // Serverlist to be used for determining drops.
285     RefCountedPtr<Serverlist> serverlist_;
286
287     UniquePtr<SubchannelPicker> child_picker_;
288     RefCountedPtr<GrpcLbClientStats> client_stats_;
289   };
290
291   class Helper : public ChannelControlHelper {
292    public:
293     explicit Helper(RefCountedPtr<GrpcLb> parent)
294         : parent_(std::move(parent)) {}
295
296     Subchannel* CreateSubchannel(const grpc_channel_args& args) override;
297     grpc_channel* CreateChannel(const char* target,
298                                 const grpc_channel_args& args) override;
299     void UpdateState(grpc_connectivity_state state,
300                      UniquePtr<SubchannelPicker> picker) override;
301     void RequestReresolution() override;
302
303     void set_child(LoadBalancingPolicy* child) { child_ = child; }
304
305    private:
306     bool CalledByPendingChild() const;
307     bool CalledByCurrentChild() const;
308
309     RefCountedPtr<GrpcLb> parent_;
310     LoadBalancingPolicy* child_ = nullptr;
311   };
312
313   ~GrpcLb();
314
315   void ShutdownLocked() override;
316
317   // Helper functions used in UpdateLocked().
318   void ProcessAddressesAndChannelArgsLocked(const ServerAddressList& addresses,
319                                             const grpc_channel_args& args);
320   static void OnBalancerChannelConnectivityChangedLocked(void* arg,
321                                                          grpc_error* error);
322   void CancelBalancerChannelConnectivityWatchLocked();
323
324   // Methods for dealing with fallback state.
325   void MaybeEnterFallbackModeAfterStartup();
326   static void OnFallbackTimerLocked(void* arg, grpc_error* error);
327
328   // Methods for dealing with the balancer call.
329   void StartBalancerCallLocked();
330   void StartBalancerCallRetryTimerLocked();
331   static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error);
332
333   // Methods for dealing with the child policy.
334   grpc_channel_args* CreateChildPolicyArgsLocked(
335       bool is_backend_from_grpclb_load_balancer);
336   OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
337       const char* name, const grpc_channel_args* args);
338   void CreateOrUpdateChildPolicyLocked();
339
340   // Who the client is trying to communicate with.
341   const char* server_name_ = nullptr;
342
343   // Current channel args from the resolver.
344   grpc_channel_args* args_ = nullptr;
345
346   // Internal state.
347   bool shutting_down_ = false;
348
349   // The channel for communicating with the LB server.
350   grpc_channel* lb_channel_ = nullptr;
351   // Uuid of the lb channel. Used for channelz.
352   gpr_atm lb_channel_uuid_ = 0;
353   // Response generator to inject address updates into lb_channel_.
354   RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
355
356   // The data associated with the current LB call. It holds a ref to this LB
357   // policy. It's initialized every time we query for backends. It's reset to
358   // NULL whenever the current LB call is no longer needed (e.g., the LB policy
359   // is shutting down, or the LB call has ended). A non-NULL lb_calld_ always
360   // contains a non-NULL lb_call_.
361   OrphanablePtr<BalancerCallState> lb_calld_;
362   // Timeout in milliseconds for the LB call. 0 means no deadline.
363   int lb_call_timeout_ms_ = 0;
364   // Balancer call retry state.
365   BackOff lb_call_backoff_;
366   bool retry_timer_callback_pending_ = false;
367   grpc_timer lb_call_retry_timer_;
368   grpc_closure lb_on_call_retry_;
369
370   // The deserialized response from the balancer. May be nullptr until one
371   // such response has arrived.
372   RefCountedPtr<Serverlist> serverlist_;
373
374   // Whether we're in fallback mode.
375   bool fallback_mode_ = false;
376   // The backend addresses from the resolver.
377   ServerAddressList fallback_backend_addresses_;
378   // State for fallback-at-startup checks.
379   // Timeout after startup after which we will go into fallback mode if
380   // we have not received a serverlist from the balancer.
381   int fallback_at_startup_timeout_ = 0;
382   bool fallback_at_startup_checks_pending_ = false;
383   grpc_timer lb_fallback_timer_;
384   grpc_closure lb_on_fallback_;
385   grpc_connectivity_state lb_channel_connectivity_ = GRPC_CHANNEL_IDLE;
386   grpc_closure lb_channel_on_connectivity_changed_;
387
388   // Lock held when modifying the value of child_policy_ or
389   // pending_child_policy_.
390   gpr_mu child_policy_mu_;
391   // The child policy to use for the backends.
392   OrphanablePtr<LoadBalancingPolicy> child_policy_;
393   // When switching child policies, the new policy will be stored here
394   // until it reports READY, at which point it will be moved to child_policy_.
395   OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
396   // The child policy config.
397   RefCountedPtr<ParsedLoadBalancingConfig> child_policy_config_;
398   // Child policy in state READY.
399   bool child_policy_ready_ = false;
400 };
401
402 //
403 // GrpcLb::Serverlist
404 //
405
406 bool GrpcLb::Serverlist::operator==(const Serverlist& other) const {
407   return grpc_grpclb_serverlist_equals(serverlist_, other.serverlist_);
408 }
409
410 void ParseServer(const grpc_grpclb_server* server,
411                  grpc_resolved_address* addr) {
412   memset(addr, 0, sizeof(*addr));
413   if (server->drop) return;
414   const uint16_t netorder_port = grpc_htons((uint16_t)server->port);
415   /* the addresses are given in binary format (a in(6)_addr struct) in
416    * server->ip_address.bytes. */
417   const grpc_grpclb_ip_address* ip = &server->ip_address;
418   if (ip->size == 4) {
419     addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in));
420     grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr);
421     addr4->sin_family = GRPC_AF_INET;
422     memcpy(&addr4->sin_addr, ip->bytes, ip->size);
423     addr4->sin_port = netorder_port;
424   } else if (ip->size == 16) {
425     addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6));
426     grpc_sockaddr_in6* addr6 = (grpc_sockaddr_in6*)&addr->addr;
427     addr6->sin6_family = GRPC_AF_INET6;
428     memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
429     addr6->sin6_port = netorder_port;
430   }
431 }
432
433 UniquePtr<char> GrpcLb::Serverlist::AsText() const {
434   gpr_strvec entries;
435   gpr_strvec_init(&entries);
436   for (size_t i = 0; i < serverlist_->num_servers; ++i) {
437     const auto* server = serverlist_->servers[i];
438     char* ipport;
439     if (server->drop) {
440       ipport = gpr_strdup("(drop)");
441     } else {
442       grpc_resolved_address addr;
443       ParseServer(server, &addr);
444       grpc_sockaddr_to_string(&ipport, &addr, false);
445     }
446     char* entry;
447     gpr_asprintf(&entry, "  %" PRIuPTR ": %s token=%s\n", i, ipport,
448                  server->load_balance_token);
449     gpr_free(ipport);
450     gpr_strvec_add(&entries, entry);
451   }
452   UniquePtr<char> result(gpr_strvec_flatten(&entries, nullptr));
453   gpr_strvec_destroy(&entries);
454   return result;
455 }
456
457 // vtable for LB token channel arg.
458 void* lb_token_copy(void* token) {
459   return token == nullptr
460              ? nullptr
461              : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload;
462 }
463 void lb_token_destroy(void* token) {
464   if (token != nullptr) {
465     GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token});
466   }
467 }
468 int lb_token_cmp(void* token1, void* token2) {
469   // Always indicate a match, since we don't want this channel arg to
470   // affect the subchannel's key in the index.
471   return 0;
472 }
473 const grpc_arg_pointer_vtable lb_token_arg_vtable = {
474     lb_token_copy, lb_token_destroy, lb_token_cmp};
475
476 bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) {
477   if (server->drop) return false;
478   const grpc_grpclb_ip_address* ip = &server->ip_address;
479   if (GPR_UNLIKELY(server->port >> 16 != 0)) {
480     if (log) {
481       gpr_log(GPR_ERROR,
482               "Invalid port '%d' at index %lu of serverlist. Ignoring.",
483               server->port, (unsigned long)idx);
484     }
485     return false;
486   }
487   if (GPR_UNLIKELY(ip->size != 4 && ip->size != 16)) {
488     if (log) {
489       gpr_log(GPR_ERROR,
490               "Expected IP to be 4 or 16 bytes, got %d at index %lu of "
491               "serverlist. Ignoring",
492               ip->size, (unsigned long)idx);
493     }
494     return false;
495   }
496   return true;
497 }
498
499 // Returns addresses extracted from the serverlist.
500 ServerAddressList GrpcLb::Serverlist::GetServerAddressList(
501     GrpcLbClientStats* client_stats) const {
502   ServerAddressList addresses;
503   for (size_t i = 0; i < serverlist_->num_servers; ++i) {
504     const grpc_grpclb_server* server = serverlist_->servers[i];
505     if (!IsServerValid(serverlist_->servers[i], i, false)) continue;
506     // Address processing.
507     grpc_resolved_address addr;
508     ParseServer(server, &addr);
509     // LB token processing.
510     grpc_mdelem lb_token;
511     if (server->has_load_balance_token) {
512       const size_t lb_token_max_length =
513           GPR_ARRAY_SIZE(server->load_balance_token);
514       const size_t lb_token_length =
515           strnlen(server->load_balance_token, lb_token_max_length);
516       grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
517           server->load_balance_token, lb_token_length);
518       lb_token = grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr);
519       if (client_stats != nullptr) {
520         GPR_ASSERT(grpc_mdelem_set_user_data(
521                        lb_token, GrpcLbClientStats::Destroy,
522                        client_stats->Ref().release()) == client_stats);
523       }
524     } else {
525       char* uri = grpc_sockaddr_to_uri(&addr);
526       gpr_log(GPR_INFO,
527               "Missing LB token for backend address '%s'. The empty token will "
528               "be used instead",
529               uri);
530       gpr_free(uri);
531       lb_token = GRPC_MDELEM_LB_TOKEN_EMPTY;
532     }
533     // Add address.
534     grpc_arg arg = grpc_channel_arg_pointer_create(
535         const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN),
536         (void*)lb_token.payload, &lb_token_arg_vtable);
537     grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
538     addresses.emplace_back(addr, args);
539     // Clean up.
540     GRPC_MDELEM_UNREF(lb_token);
541   }
542   return addresses;
543 }
544
545 bool GrpcLb::Serverlist::ContainsAllDropEntries() const {
546   if (serverlist_->num_servers == 0) return false;
547   for (size_t i = 0; i < serverlist_->num_servers; ++i) {
548     if (!serverlist_->servers[i]->drop) return false;
549   }
550   return true;
551 }
552
553 const char* GrpcLb::Serverlist::ShouldDrop() {
554   if (serverlist_->num_servers == 0) return nullptr;
555   grpc_grpclb_server* server = serverlist_->servers[drop_index_];
556   drop_index_ = (drop_index_ + 1) % serverlist_->num_servers;
557   return server->drop ? server->load_balance_token : nullptr;
558 }
559
560 //
561 // GrpcLb::Picker
562 //
563
564 GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs* pick, grpc_error** error) {
565   // Check if we should drop the call.
566   const char* drop_token = serverlist_->ShouldDrop();
567   if (drop_token != nullptr) {
568     // Update client load reporting stats to indicate the number of
569     // dropped calls.  Note that we have to do this here instead of in
570     // the client_load_reporting filter, because we do not create a
571     // subchannel call (and therefore no client_load_reporting filter)
572     // for dropped calls.
573     if (client_stats_ != nullptr) {
574       client_stats_->AddCallDropped(drop_token);
575     }
576     return PICK_COMPLETE;
577   }
578   // Forward pick to child policy.
579   PickResult result = child_picker_->Pick(pick, error);
580   // If pick succeeded, add LB token to initial metadata.
581   if (result == PickResult::PICK_COMPLETE &&
582       pick->connected_subchannel != nullptr) {
583     const grpc_arg* arg = grpc_channel_args_find(
584         pick->connected_subchannel->args(), GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN);
585     if (arg == nullptr) {
586       gpr_log(GPR_ERROR,
587               "[grpclb %p picker %p] No LB token for connected subchannel "
588               "pick %p",
589               parent_, this, pick);
590       abort();
591     }
592     grpc_mdelem lb_token = {reinterpret_cast<uintptr_t>(arg->value.pointer.p)};
593     GPR_ASSERT(!GRPC_MDISNULL(lb_token));
594     GPR_ASSERT(grpc_metadata_batch_add_tail(
595                    pick->initial_metadata, &pick->lb_token_mdelem_storage,
596                    GRPC_MDELEM_REF(lb_token)) == GRPC_ERROR_NONE);
597     GrpcLbClientStats* client_stats = static_cast<GrpcLbClientStats*>(
598         grpc_mdelem_get_user_data(lb_token, GrpcLbClientStats::Destroy));
599     if (client_stats != nullptr) {
600       client_stats->AddCallStarted();
601     }
602   }
603   return result;
604 }
605
606 //
607 // GrpcLb::Helper
608 //
609
610 bool GrpcLb::Helper::CalledByPendingChild() const {
611   GPR_ASSERT(child_ != nullptr);
612   return child_ == parent_->pending_child_policy_.get();
613 }
614
615 bool GrpcLb::Helper::CalledByCurrentChild() const {
616   GPR_ASSERT(child_ != nullptr);
617   return child_ == parent_->child_policy_.get();
618 }
619
620 Subchannel* GrpcLb::Helper::CreateSubchannel(const grpc_channel_args& args) {
621   if (parent_->shutting_down_ ||
622       (!CalledByPendingChild() && !CalledByCurrentChild())) {
623     return nullptr;
624   }
625   return parent_->channel_control_helper()->CreateSubchannel(args);
626 }
627
628 grpc_channel* GrpcLb::Helper::CreateChannel(const char* target,
629                                             const grpc_channel_args& args) {
630   if (parent_->shutting_down_ ||
631       (!CalledByPendingChild() && !CalledByCurrentChild())) {
632     return nullptr;
633   }
634   return parent_->channel_control_helper()->CreateChannel(target, args);
635 }
636
637 void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
638                                  UniquePtr<SubchannelPicker> picker) {
639   if (parent_->shutting_down_) return;
640   // If this request is from the pending child policy, ignore it until
641   // it reports READY, at which point we swap it into place.
642   if (CalledByPendingChild()) {
643     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
644       gpr_log(GPR_INFO,
645               "[grpclb %p helper %p] pending child policy %p reports state=%s",
646               parent_.get(), this, parent_->pending_child_policy_.get(),
647               grpc_connectivity_state_name(state));
648     }
649     if (state != GRPC_CHANNEL_READY) return;
650     grpc_pollset_set_del_pollset_set(
651         parent_->child_policy_->interested_parties(),
652         parent_->interested_parties());
653     MutexLock lock(&parent_->child_policy_mu_);
654     parent_->child_policy_ = std::move(parent_->pending_child_policy_);
655   } else if (!CalledByCurrentChild()) {
656     // This request is from an outdated child, so ignore it.
657     return;
658   }
659   // Record whether child policy reports READY.
660   parent_->child_policy_ready_ = state == GRPC_CHANNEL_READY;
661   // Enter fallback mode if needed.
662   parent_->MaybeEnterFallbackModeAfterStartup();
663   // There are three cases to consider here:
664   // 1. We're in fallback mode.  In this case, we're always going to use
665   //    the child policy's result, so we pass its picker through as-is.
666   // 2. The serverlist contains only drop entries.  In this case, we
667   //    want to use our own picker so that we can return the drops.
668   // 3. Not in fallback mode and serverlist is not all drops (i.e., it
669   //    may be empty or contain at least one backend address).  There are
670   //    two sub-cases:
671   //    a. The child policy is reporting state READY.  In this case, we wrap
672   //       the child's picker in our own, so that we can handle drops and LB
673   //       token metadata for each pick.
674   //    b. The child policy is reporting a state other than READY.  In this
675   //       case, we don't want to use our own picker, because we don't want
676   //       to process drops for picks that yield a QUEUE result; this would
677   //       result in dropping too many calls, since we will see the
678   //       queued picks multiple times, and we'd consider each one a
679   //       separate call for the drop calculation.
680   //
681   // Cases 1 and 3b: return picker from the child policy as-is.
682   if (parent_->serverlist_ == nullptr ||
683       (!parent_->serverlist_->ContainsAllDropEntries() &&
684        state != GRPC_CHANNEL_READY)) {
685     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
686       gpr_log(GPR_INFO,
687               "[grpclb %p helper %p] state=%s passing child picker %p as-is",
688               parent_.get(), this, grpc_connectivity_state_name(state),
689               picker.get());
690     }
691     parent_->channel_control_helper()->UpdateState(state, std::move(picker));
692     return;
693   }
694   // Cases 2 and 3a: wrap picker from the child in our own picker.
695   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
696     gpr_log(GPR_INFO, "[grpclb %p helper %p] state=%s wrapping child picker %p",
697             parent_.get(), this, grpc_connectivity_state_name(state),
698             picker.get());
699   }
700   RefCountedPtr<GrpcLbClientStats> client_stats;
701   if (parent_->lb_calld_ != nullptr &&
702       parent_->lb_calld_->client_stats() != nullptr) {
703     client_stats = parent_->lb_calld_->client_stats()->Ref();
704   }
705   parent_->channel_control_helper()->UpdateState(
706       state, UniquePtr<SubchannelPicker>(
707                  New<Picker>(parent_.get(), parent_->serverlist_,
708                              std::move(picker), std::move(client_stats))));
709 }
710
711 void GrpcLb::Helper::RequestReresolution() {
712   if (parent_->shutting_down_) return;
713   const LoadBalancingPolicy* latest_child_policy =
714       parent_->pending_child_policy_ != nullptr
715           ? parent_->pending_child_policy_.get()
716           : parent_->child_policy_.get();
717   if (child_ != latest_child_policy) return;
718   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
719     gpr_log(GPR_INFO,
720             "[grpclb %p] Re-resolution requested from %schild policy (%p).",
721             parent_.get(), CalledByPendingChild() ? "pending " : "", child_);
722   }
723   // If we are talking to a balancer, we expect to get updated addresses
724   // from the balancer, so we can ignore the re-resolution request from
725   // the child policy. Otherwise, pass the re-resolution request up to the
726   // channel.
727   if (parent_->lb_calld_ == nullptr ||
728       !parent_->lb_calld_->seen_initial_response()) {
729     parent_->channel_control_helper()->RequestReresolution();
730   }
731 }
732
733 //
734 // GrpcLb::BalancerCallState
735 //
736
737 GrpcLb::BalancerCallState::BalancerCallState(
738     RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)
739     : InternallyRefCounted<BalancerCallState>(&grpc_lb_glb_trace),
740       grpclb_policy_(std::move(parent_grpclb_policy)) {
741   GPR_ASSERT(grpclb_policy_ != nullptr);
742   GPR_ASSERT(!grpclb_policy()->shutting_down_);
743   // Init the LB call. Note that the LB call will progress every time there's
744   // activity in grpclb_policy_->interested_parties(), which is comprised of
745   // the polling entities from client_channel.
746   GPR_ASSERT(grpclb_policy()->server_name_ != nullptr);
747   GPR_ASSERT(grpclb_policy()->server_name_[0] != '\0');
748   const grpc_millis deadline =
749       grpclb_policy()->lb_call_timeout_ms_ == 0
750           ? GRPC_MILLIS_INF_FUTURE
751           : ExecCtx::Get()->Now() + grpclb_policy()->lb_call_timeout_ms_;
752   lb_call_ = grpc_channel_create_pollset_set_call(
753       grpclb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
754       grpclb_policy_->interested_parties(),
755       GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
756       nullptr, deadline, nullptr);
757   // Init the LB call request payload.
758   grpc_grpclb_request* request =
759       grpc_grpclb_request_create(grpclb_policy()->server_name_);
760   grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
761   send_message_payload_ =
762       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
763   grpc_slice_unref_internal(request_payload_slice);
764   grpc_grpclb_request_destroy(request);
765   // Init other data associated with the LB call.
766   grpc_metadata_array_init(&lb_initial_metadata_recv_);
767   grpc_metadata_array_init(&lb_trailing_metadata_recv_);
768   GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSentLocked,
769                     this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
770   GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_,
771                     OnBalancerMessageReceivedLocked, this,
772                     grpc_combiner_scheduler(grpclb_policy()->combiner()));
773   GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_,
774                     OnBalancerStatusReceivedLocked, this,
775                     grpc_combiner_scheduler(grpclb_policy()->combiner()));
776 }
777
778 GrpcLb::BalancerCallState::~BalancerCallState() {
779   GPR_ASSERT(lb_call_ != nullptr);
780   grpc_call_unref(lb_call_);
781   grpc_metadata_array_destroy(&lb_initial_metadata_recv_);
782   grpc_metadata_array_destroy(&lb_trailing_metadata_recv_);
783   grpc_byte_buffer_destroy(send_message_payload_);
784   grpc_byte_buffer_destroy(recv_message_payload_);
785   grpc_slice_unref_internal(lb_call_status_details_);
786 }
787
788 void GrpcLb::BalancerCallState::Orphan() {
789   GPR_ASSERT(lb_call_ != nullptr);
790   // If we are here because grpclb_policy wants to cancel the call,
791   // lb_on_balancer_status_received_ will complete the cancellation and clean
792   // up. Otherwise, we are here because grpclb_policy has to orphan a failed
793   // call, then the following cancellation will be a no-op.
794   grpc_call_cancel(lb_call_, nullptr);
795   if (client_load_report_timer_callback_pending_) {
796     grpc_timer_cancel(&client_load_report_timer_);
797   }
798   // Note that the initial ref is hold by lb_on_balancer_status_received_
799   // instead of the caller of this function. So the corresponding unref happens
800   // in lb_on_balancer_status_received_ instead of here.
801 }
802
803 void GrpcLb::BalancerCallState::StartQuery() {
804   GPR_ASSERT(lb_call_ != nullptr);
805   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
806     gpr_log(GPR_INFO, "[grpclb %p] lb_calld=%p: Starting LB call %p",
807             grpclb_policy_.get(), this, lb_call_);
808   }
809   // Create the ops.
810   grpc_call_error call_error;
811   grpc_op ops[3];
812   memset(ops, 0, sizeof(ops));
813   // Op: send initial metadata.
814   grpc_op* op = ops;
815   op->op = GRPC_OP_SEND_INITIAL_METADATA;
816   op->data.send_initial_metadata.count = 0;
817   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
818               GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
819   op->reserved = nullptr;
820   op++;
821   // Op: send request message.
822   GPR_ASSERT(send_message_payload_ != nullptr);
823   op->op = GRPC_OP_SEND_MESSAGE;
824   op->data.send_message.send_message = send_message_payload_;
825   op->flags = 0;
826   op->reserved = nullptr;
827   op++;
828   // TODO(roth): We currently track this ref manually.  Once the
829   // ClosureRef API is ready, we should pass the RefCountedPtr<> along
830   // with the callback.
831   auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent");
832   self.release();
833   call_error = grpc_call_start_batch_and_execute(
834       lb_call_, ops, (size_t)(op - ops), &lb_on_initial_request_sent_);
835   GPR_ASSERT(GRPC_CALL_OK == call_error);
836   // Op: recv initial metadata.
837   op = ops;
838   op->op = GRPC_OP_RECV_INITIAL_METADATA;
839   op->data.recv_initial_metadata.recv_initial_metadata =
840       &lb_initial_metadata_recv_;
841   op->flags = 0;
842   op->reserved = nullptr;
843   op++;
844   // Op: recv response.
845   op->op = GRPC_OP_RECV_MESSAGE;
846   op->data.recv_message.recv_message = &recv_message_payload_;
847   op->flags = 0;
848   op->reserved = nullptr;
849   op++;
850   // TODO(roth): We currently track this ref manually.  Once the
851   // ClosureRef API is ready, we should pass the RefCountedPtr<> along
852   // with the callback.
853   self = Ref(DEBUG_LOCATION, "on_message_received");
854   self.release();
855   call_error = grpc_call_start_batch_and_execute(
856       lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_message_received_);
857   GPR_ASSERT(GRPC_CALL_OK == call_error);
858   // Op: recv server status.
859   op = ops;
860   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
861   op->data.recv_status_on_client.trailing_metadata =
862       &lb_trailing_metadata_recv_;
863   op->data.recv_status_on_client.status = &lb_call_status_;
864   op->data.recv_status_on_client.status_details = &lb_call_status_details_;
865   op->flags = 0;
866   op->reserved = nullptr;
867   op++;
868   // This callback signals the end of the LB call, so it relies on the initial
869   // ref instead of a new ref. When it's invoked, it's the initial ref that is
870   // unreffed.
871   call_error = grpc_call_start_batch_and_execute(
872       lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_status_received_);
873   GPR_ASSERT(GRPC_CALL_OK == call_error);
874 }
875
876 void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
877   const grpc_millis next_client_load_report_time =
878       ExecCtx::Get()->Now() + client_stats_report_interval_;
879   GRPC_CLOSURE_INIT(&client_load_report_closure_,
880                     MaybeSendClientLoadReportLocked, this,
881                     grpc_combiner_scheduler(grpclb_policy()->combiner()));
882   grpc_timer_init(&client_load_report_timer_, next_client_load_report_time,
883                   &client_load_report_closure_);
884   client_load_report_timer_callback_pending_ = true;
885 }
886
887 void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked(
888     void* arg, grpc_error* error) {
889   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
890   GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
891   lb_calld->client_load_report_timer_callback_pending_ = false;
892   if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
893     lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
894     return;
895   }
896   // If we've already sent the initial request, then we can go ahead and send
897   // the load report. Otherwise, we need to wait until the initial request has
898   // been sent to send this (see OnInitialRequestSentLocked()).
899   if (lb_calld->send_message_payload_ == nullptr) {
900     lb_calld->SendClientLoadReportLocked();
901   } else {
902     lb_calld->client_load_report_is_due_ = true;
903   }
904 }
905
906 bool GrpcLb::BalancerCallState::LoadReportCountersAreZero(
907     grpc_grpclb_request* request) {
908   GrpcLbClientStats::DroppedCallCounts* drop_entries =
909       static_cast<GrpcLbClientStats::DroppedCallCounts*>(
910           request->client_stats.calls_finished_with_drop.arg);
911   return request->client_stats.num_calls_started == 0 &&
912          request->client_stats.num_calls_finished == 0 &&
913          request->client_stats.num_calls_finished_with_client_failed_to_send ==
914              0 &&
915          request->client_stats.num_calls_finished_known_received == 0 &&
916          (drop_entries == nullptr || drop_entries->size() == 0);
917 }
918
919 void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
920   // Construct message payload.
921   GPR_ASSERT(send_message_payload_ == nullptr);
922   grpc_grpclb_request* request =
923       grpc_grpclb_load_report_request_create(client_stats_.get());
924   // Skip client load report if the counters were all zero in the last
925   // report and they are still zero in this one.
926   if (LoadReportCountersAreZero(request)) {
927     if (last_client_load_report_counters_were_zero_) {
928       grpc_grpclb_request_destroy(request);
929       ScheduleNextClientLoadReportLocked();
930       return;
931     }
932     last_client_load_report_counters_were_zero_ = true;
933   } else {
934     last_client_load_report_counters_were_zero_ = false;
935   }
936   grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
937   send_message_payload_ =
938       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
939   grpc_slice_unref_internal(request_payload_slice);
940   grpc_grpclb_request_destroy(request);
941   // Send the report.
942   grpc_op op;
943   memset(&op, 0, sizeof(op));
944   op.op = GRPC_OP_SEND_MESSAGE;
945   op.data.send_message.send_message = send_message_payload_;
946   GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDoneLocked,
947                     this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
948   grpc_call_error call_error = grpc_call_start_batch_and_execute(
949       lb_call_, &op, 1, &client_load_report_closure_);
950   if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
951     gpr_log(GPR_ERROR,
952             "[grpclb %p] lb_calld=%p call_error=%d sending client load report",
953             grpclb_policy_.get(), this, call_error);
954     GPR_ASSERT(GRPC_CALL_OK == call_error);
955   }
956 }
957
958 void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg,
959                                                            grpc_error* error) {
960   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
961   GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
962   grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
963   lb_calld->send_message_payload_ = nullptr;
964   if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
965     lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
966     return;
967   }
968   lb_calld->ScheduleNextClientLoadReportLocked();
969 }
970
971 void GrpcLb::BalancerCallState::OnInitialRequestSentLocked(void* arg,
972                                                            grpc_error* error) {
973   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
974   grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
975   lb_calld->send_message_payload_ = nullptr;
976   // If we attempted to send a client load report before the initial request was
977   // sent (and this lb_calld is still in use), send the load report now.
978   if (lb_calld->client_load_report_is_due_ &&
979       lb_calld == lb_calld->grpclb_policy()->lb_calld_.get()) {
980     lb_calld->SendClientLoadReportLocked();
981     lb_calld->client_load_report_is_due_ = false;
982   }
983   lb_calld->Unref(DEBUG_LOCATION, "on_initial_request_sent");
984 }
985
986 void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
987     void* arg, grpc_error* error) {
988   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
989   GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
990   // Null payload means the LB call was cancelled.
991   if (lb_calld != grpclb_policy->lb_calld_.get() ||
992       lb_calld->recv_message_payload_ == nullptr) {
993     lb_calld->Unref(DEBUG_LOCATION, "on_message_received");
994     return;
995   }
996   grpc_byte_buffer_reader bbr;
997   grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload_);
998   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
999   grpc_byte_buffer_reader_destroy(&bbr);
1000   grpc_byte_buffer_destroy(lb_calld->recv_message_payload_);
1001   lb_calld->recv_message_payload_ = nullptr;
1002   grpc_grpclb_initial_response* initial_response;
1003   grpc_grpclb_serverlist* serverlist;
1004   if (!lb_calld->seen_initial_response_ &&
1005       (initial_response = grpc_grpclb_initial_response_parse(response_slice)) !=
1006           nullptr) {
1007     // Have NOT seen initial response, look for initial response.
1008     if (initial_response->has_client_stats_report_interval) {
1009       lb_calld->client_stats_report_interval_ = GPR_MAX(
1010           GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis(
1011                               &initial_response->client_stats_report_interval));
1012       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1013         gpr_log(GPR_INFO,
1014                 "[grpclb %p] lb_calld=%p: Received initial LB response "
1015                 "message; client load reporting interval = %" PRId64
1016                 " milliseconds",
1017                 grpclb_policy, lb_calld,
1018                 lb_calld->client_stats_report_interval_);
1019       }
1020     } else if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1021       gpr_log(GPR_INFO,
1022               "[grpclb %p] lb_calld=%p: Received initial LB response message; "
1023               "client load reporting NOT enabled",
1024               grpclb_policy, lb_calld);
1025     }
1026     grpc_grpclb_initial_response_destroy(initial_response);
1027     lb_calld->seen_initial_response_ = true;
1028   } else if ((serverlist = grpc_grpclb_response_parse_serverlist(
1029                   response_slice)) != nullptr) {
1030     // Have seen initial response, look for serverlist.
1031     GPR_ASSERT(lb_calld->lb_call_ != nullptr);
1032     auto serverlist_wrapper = MakeRefCounted<Serverlist>(serverlist);
1033     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1034       UniquePtr<char> serverlist_text = serverlist_wrapper->AsText();
1035       gpr_log(GPR_INFO,
1036               "[grpclb %p] lb_calld=%p: Serverlist with %" PRIuPTR
1037               " servers received:\n%s",
1038               grpclb_policy, lb_calld, serverlist->num_servers,
1039               serverlist_text.get());
1040     }
1041     lb_calld->seen_serverlist_ = true;
1042     // Start sending client load report only after we start using the
1043     // serverlist returned from the current LB call.
1044     if (lb_calld->client_stats_report_interval_ > 0 &&
1045         lb_calld->client_stats_ == nullptr) {
1046       lb_calld->client_stats_ = MakeRefCounted<GrpcLbClientStats>();
1047       // Ref held by callback.
1048       lb_calld->Ref(DEBUG_LOCATION, "client_load_report").release();
1049       lb_calld->ScheduleNextClientLoadReportLocked();
1050     }
1051     // Check if the serverlist differs from the previous one.
1052     if (grpclb_policy->serverlist_ != nullptr &&
1053         *grpclb_policy->serverlist_ == *serverlist_wrapper) {
1054       if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1055         gpr_log(GPR_INFO,
1056                 "[grpclb %p] lb_calld=%p: Incoming server list identical to "
1057                 "current, ignoring.",
1058                 grpclb_policy, lb_calld);
1059       }
1060     } else {  // New serverlist.
1061       // Dispose of the fallback.
1062       // TODO(roth): Ideally, we should stay in fallback mode until we
1063       // know that we can reach at least one of the backends in the new
1064       // serverlist.  Unfortunately, we can't do that, since we need to
1065       // send the new addresses to the child policy in order to determine
1066       // if they are reachable, and if we don't exit fallback mode now,
1067       // CreateOrUpdateChildPolicyLocked() will use the fallback
1068       // addresses instead of the addresses from the new serverlist.
1069       // However, if we can't reach any of the servers in the new
1070       // serverlist, then the child policy will never switch away from
1071       // the fallback addresses, but the grpclb policy will still think
1072       // that we're not in fallback mode, which means that we won't send
1073       // updates to the child policy when the fallback addresses are
1074       // updated by the resolver.  This is sub-optimal, but the only way
1075       // to fix it is to maintain a completely separate child policy for
1076       // fallback mode, and that's more work than we want to put into
1077       // the grpclb implementation at this point, since we're deprecating
1078       // it in favor of the xds policy.  We will implement this the
1079       // right way in the xds policy instead.
1080       if (grpclb_policy->fallback_mode_) {
1081         gpr_log(GPR_INFO,
1082                 "[grpclb %p] Received response from balancer; exiting "
1083                 "fallback mode",
1084                 grpclb_policy);
1085         grpclb_policy->fallback_mode_ = false;
1086       }
1087       if (grpclb_policy->fallback_at_startup_checks_pending_) {
1088         grpclb_policy->fallback_at_startup_checks_pending_ = false;
1089         grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
1090         grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
1091       }
1092       // Update the serverlist in the GrpcLb instance. This serverlist
1093       // instance will be destroyed either upon the next update or when the
1094       // GrpcLb instance is destroyed.
1095       grpclb_policy->serverlist_ = std::move(serverlist_wrapper);
1096       grpclb_policy->CreateOrUpdateChildPolicyLocked();
1097     }
1098   } else {
1099     // No valid initial response or serverlist found.
1100     char* response_slice_str =
1101         grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX);
1102     gpr_log(GPR_ERROR,
1103             "[grpclb %p] lb_calld=%p: Invalid LB response received: '%s'. "
1104             "Ignoring.",
1105             grpclb_policy, lb_calld, response_slice_str);
1106     gpr_free(response_slice_str);
1107   }
1108   grpc_slice_unref_internal(response_slice);
1109   if (!grpclb_policy->shutting_down_) {
1110     // Keep listening for serverlist updates.
1111     grpc_op op;
1112     memset(&op, 0, sizeof(op));
1113     op.op = GRPC_OP_RECV_MESSAGE;
1114     op.data.recv_message.recv_message = &lb_calld->recv_message_payload_;
1115     op.flags = 0;
1116     op.reserved = nullptr;
1117     // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
1118     const grpc_call_error call_error = grpc_call_start_batch_and_execute(
1119         lb_calld->lb_call_, &op, 1,
1120         &lb_calld->lb_on_balancer_message_received_);
1121     GPR_ASSERT(GRPC_CALL_OK == call_error);
1122   } else {
1123     lb_calld->Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown");
1124   }
1125 }
1126
1127 void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
1128     void* arg, grpc_error* error) {
1129   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1130   GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
1131   GPR_ASSERT(lb_calld->lb_call_ != nullptr);
1132   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1133     char* status_details =
1134         grpc_slice_to_c_string(lb_calld->lb_call_status_details_);
1135     gpr_log(GPR_INFO,
1136             "[grpclb %p] lb_calld=%p: Status from LB server received. "
1137             "Status = %d, details = '%s', (lb_call: %p), error '%s'",
1138             grpclb_policy, lb_calld, lb_calld->lb_call_status_, status_details,
1139             lb_calld->lb_call_, grpc_error_string(error));
1140     gpr_free(status_details);
1141   }
1142   // If this lb_calld is still in use, this call ended because of a failure so
1143   // we want to retry connecting. Otherwise, we have deliberately ended this
1144   // call and no further action is required.
1145   if (lb_calld == grpclb_policy->lb_calld_.get()) {
1146     // If the fallback-at-startup checks are pending, go into fallback mode
1147     // immediately.  This short-circuits the timeout for the fallback-at-startup
1148     // case.
1149     if (grpclb_policy->fallback_at_startup_checks_pending_) {
1150       GPR_ASSERT(!lb_calld->seen_serverlist_);
1151       gpr_log(GPR_INFO,
1152               "[grpclb %p] Balancer call finished without receiving "
1153               "serverlist; entering fallback mode",
1154               grpclb_policy);
1155       grpclb_policy->fallback_at_startup_checks_pending_ = false;
1156       grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
1157       grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
1158       grpclb_policy->fallback_mode_ = true;
1159       grpclb_policy->CreateOrUpdateChildPolicyLocked();
1160     } else {
1161       // This handles the fallback-after-startup case.
1162       grpclb_policy->MaybeEnterFallbackModeAfterStartup();
1163     }
1164     grpclb_policy->lb_calld_.reset();
1165     GPR_ASSERT(!grpclb_policy->shutting_down_);
1166     grpclb_policy->channel_control_helper()->RequestReresolution();
1167     if (lb_calld->seen_initial_response_) {
1168       // If we lose connection to the LB server, reset the backoff and restart
1169       // the LB call immediately.
1170       grpclb_policy->lb_call_backoff_.Reset();
1171       grpclb_policy->StartBalancerCallLocked();
1172     } else {
1173       // If this LB call fails establishing any connection to the LB server,
1174       // retry later.
1175       grpclb_policy->StartBalancerCallRetryTimerLocked();
1176     }
1177   }
1178   lb_calld->Unref(DEBUG_LOCATION, "lb_call_ended");
1179 }
1180
1181 //
1182 // helper code for creating balancer channel
1183 //
1184
1185 ServerAddressList ExtractBalancerAddresses(const ServerAddressList& addresses) {
1186   ServerAddressList balancer_addresses;
1187   for (size_t i = 0; i < addresses.size(); ++i) {
1188     if (addresses[i].IsBalancer()) {
1189       // Strip out the is_balancer channel arg, since we don't want to
1190       // recursively use the grpclb policy in the channel used to talk to
1191       // the balancers.  Note that we do NOT strip out the balancer_name
1192       // channel arg, since we need that to set the authority correctly
1193       // to talk to the balancers.
1194       static const char* args_to_remove[] = {
1195           GRPC_ARG_ADDRESS_IS_BALANCER,
1196       };
1197       balancer_addresses.emplace_back(
1198           addresses[i].address(),
1199           grpc_channel_args_copy_and_remove(addresses[i].args(), args_to_remove,
1200                                             GPR_ARRAY_SIZE(args_to_remove)));
1201     }
1202   }
1203   return balancer_addresses;
1204 }
1205
1206 /* Returns the channel args for the LB channel, used to create a bidirectional
1207  * stream for the reception of load balancing updates.
1208  *
1209  * Inputs:
1210  *   - \a addresses: corresponding to the balancers.
1211  *   - \a response_generator: in order to propagate updates from the resolver
1212  *   above the grpclb policy.
1213  *   - \a args: other args inherited from the grpclb policy. */
1214 grpc_channel_args* BuildBalancerChannelArgs(
1215     const ServerAddressList& addresses,
1216     FakeResolverResponseGenerator* response_generator,
1217     const grpc_channel_args* args) {
1218   // Channel args to remove.
1219   static const char* args_to_remove[] = {
1220       // LB policy name, since we want to use the default (pick_first) in
1221       // the LB channel.
1222       GRPC_ARG_LB_POLICY_NAME,
1223       // Strip out the service config, since we don't want the LB policy
1224       // config specified for the parent channel to affect the LB channel.
1225       GRPC_ARG_SERVICE_CONFIG,
1226       // The channel arg for the server URI, since that will be different for
1227       // the LB channel than for the parent channel.  The client channel
1228       // factory will re-add this arg with the right value.
1229       GRPC_ARG_SERVER_URI,
1230       // The fake resolver response generator, because we are replacing it
1231       // with the one from the grpclb policy, used to propagate updates to
1232       // the LB channel.
1233       GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
1234       // The LB channel should use the authority indicated by the target
1235       // authority table (see \a grpc_lb_policy_grpclb_modify_lb_channel_args),
1236       // as opposed to the authority from the parent channel.
1237       GRPC_ARG_DEFAULT_AUTHORITY,
1238       // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should be
1239       // treated as a stand-alone channel and not inherit this argument from the
1240       // args of the parent channel.
1241       GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
1242   };
1243   // Channel args to add.
1244   const grpc_arg args_to_add[] = {
1245       // The fake resolver response generator, which we use to inject
1246       // address updates into the LB channel.
1247       grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
1248           response_generator),
1249       // A channel arg indicating the target is a grpclb load balancer.
1250       grpc_channel_arg_integer_create(
1251           const_cast<char*>(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER), 1),
1252       // A channel arg indicating this is an internal channels, aka it is
1253       // owned by components in Core, not by the user application.
1254       grpc_channel_arg_integer_create(
1255           const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL), 1),
1256   };
1257   // Construct channel args.
1258   grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
1259       args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add,
1260       GPR_ARRAY_SIZE(args_to_add));
1261   // Make any necessary modifications for security.
1262   return grpc_lb_policy_grpclb_modify_lb_channel_args(addresses, new_args);
1263 }
1264
1265 //
1266 // ctor and dtor
1267 //
1268
1269 GrpcLb::GrpcLb(Args args)
1270     : LoadBalancingPolicy(std::move(args)),
1271       response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
1272       lb_call_backoff_(
1273           BackOff::Options()
1274               .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS *
1275                                    1000)
1276               .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
1277               .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
1278               .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS *
1279                                1000)) {
1280   // Initialization.
1281   GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this,
1282                     grpc_combiner_scheduler(combiner()));
1283   GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
1284                     &GrpcLb::OnBalancerChannelConnectivityChangedLocked, this,
1285                     grpc_combiner_scheduler(args.combiner));
1286   gpr_mu_init(&child_policy_mu_);
1287   // Record server name.
1288   const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
1289   const char* server_uri = grpc_channel_arg_get_string(arg);
1290   GPR_ASSERT(server_uri != nullptr);
1291   grpc_uri* uri = grpc_uri_parse(server_uri, true);
1292   GPR_ASSERT(uri->path[0] != '\0');
1293   server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
1294   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1295     gpr_log(GPR_INFO,
1296             "[grpclb %p] Will use '%s' as the server name for LB request.",
1297             this, server_name_);
1298   }
1299   grpc_uri_destroy(uri);
1300   // Record LB call timeout.
1301   arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
1302   lb_call_timeout_ms_ = grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
1303   // Record fallback-at-startup timeout.
1304   arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
1305   fallback_at_startup_timeout_ = grpc_channel_arg_get_integer(
1306       arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX});
1307 }
1308
1309 GrpcLb::~GrpcLb() {
1310   gpr_free((void*)server_name_);
1311   grpc_channel_args_destroy(args_);
1312   gpr_mu_destroy(&child_policy_mu_);
1313 }
1314
1315 void GrpcLb::ShutdownLocked() {
1316   shutting_down_ = true;
1317   lb_calld_.reset();
1318   if (retry_timer_callback_pending_) {
1319     grpc_timer_cancel(&lb_call_retry_timer_);
1320   }
1321   if (fallback_at_startup_checks_pending_) {
1322     grpc_timer_cancel(&lb_fallback_timer_);
1323     CancelBalancerChannelConnectivityWatchLocked();
1324   }
1325   if (child_policy_ != nullptr) {
1326     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
1327                                      interested_parties());
1328   }
1329   if (pending_child_policy_ != nullptr) {
1330     grpc_pollset_set_del_pollset_set(
1331         pending_child_policy_->interested_parties(), interested_parties());
1332   }
1333   {
1334     MutexLock lock(&child_policy_mu_);
1335     child_policy_.reset();
1336     pending_child_policy_.reset();
1337   }
1338   // We destroy the LB channel here instead of in our destructor because
1339   // destroying the channel triggers a last callback to
1340   // OnBalancerChannelConnectivityChangedLocked(), and we need to be
1341   // alive when that callback is invoked.
1342   if (lb_channel_ != nullptr) {
1343     grpc_channel_destroy(lb_channel_);
1344     lb_channel_ = nullptr;
1345     gpr_atm_no_barrier_store(&lb_channel_uuid_, 0);
1346   }
1347 }
1348
1349 //
1350 // public methods
1351 //
1352
1353 void GrpcLb::ResetBackoffLocked() {
1354   if (lb_channel_ != nullptr) {
1355     grpc_channel_reset_connect_backoff(lb_channel_);
1356   }
1357   if (child_policy_ != nullptr) {
1358     child_policy_->ResetBackoffLocked();
1359   }
1360   if (pending_child_policy_ != nullptr) {
1361     pending_child_policy_->ResetBackoffLocked();
1362   }
1363 }
1364
1365 void GrpcLb::FillChildRefsForChannelz(
1366     channelz::ChildRefsList* child_subchannels,
1367     channelz::ChildRefsList* child_channels) {
1368   {
1369     // Delegate to the child policy to fill the children subchannels.
1370     // This must be done holding child_policy_mu_, since this method
1371     // does not run in the combiner.
1372     MutexLock lock(&child_policy_mu_);
1373     if (child_policy_ != nullptr) {
1374       child_policy_->FillChildRefsForChannelz(child_subchannels,
1375                                               child_channels);
1376     }
1377     if (pending_child_policy_ != nullptr) {
1378       pending_child_policy_->FillChildRefsForChannelz(child_subchannels,
1379                                                       child_channels);
1380     }
1381   }
1382   gpr_atm uuid = gpr_atm_no_barrier_load(&lb_channel_uuid_);
1383   if (uuid != 0) {
1384     child_channels->push_back(uuid);
1385   }
1386 }
1387
1388 void GrpcLb::UpdateLocked(UpdateArgs args) {
1389   const bool is_initial_update = lb_channel_ == nullptr;
1390   auto* grpclb_config =
1391       static_cast<const ParsedGrpcLbConfig*>(args.config.get());
1392   if (grpclb_config != nullptr) {
1393     child_policy_config_ = grpclb_config->child_policy();
1394   } else {
1395     child_policy_config_ = nullptr;
1396   }
1397   ProcessAddressesAndChannelArgsLocked(args.addresses, *args.args);
1398   // Update the existing child policy.
1399   if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked();
1400   // If this is the initial update, start the fallback-at-startup checks
1401   // and the balancer call.
1402   if (is_initial_update) {
1403     fallback_at_startup_checks_pending_ = true;
1404     // Start timer.
1405     grpc_millis deadline = ExecCtx::Get()->Now() + fallback_at_startup_timeout_;
1406     Ref(DEBUG_LOCATION, "on_fallback_timer").release();  // Ref for callback
1407     grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
1408     // Start watching the channel's connectivity state.  If the channel
1409     // goes into state TRANSIENT_FAILURE before the timer fires, we go into
1410     // fallback mode even if the fallback timeout has not elapsed.
1411     grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
1412         grpc_channel_get_channel_stack(lb_channel_));
1413     GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1414     // Ref held by callback.
1415     Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity").release();
1416     grpc_client_channel_watch_connectivity_state(
1417         client_channel_elem,
1418         grpc_polling_entity_create_from_pollset_set(interested_parties()),
1419         &lb_channel_connectivity_, &lb_channel_on_connectivity_changed_,
1420         nullptr);
1421     // Start balancer call.
1422     StartBalancerCallLocked();
1423   }
1424 }
1425
1426 //
1427 // helpers for UpdateLocked()
1428 //
1429
1430 // Returns the backend addresses extracted from the given addresses.
1431 ServerAddressList ExtractBackendAddresses(const ServerAddressList& addresses) {
1432   void* lb_token = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
1433   grpc_arg arg = grpc_channel_arg_pointer_create(
1434       const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token,
1435       &lb_token_arg_vtable);
1436   ServerAddressList backend_addresses;
1437   for (size_t i = 0; i < addresses.size(); ++i) {
1438     if (!addresses[i].IsBalancer()) {
1439       backend_addresses.emplace_back(
1440           addresses[i].address(),
1441           grpc_channel_args_copy_and_add(addresses[i].args(), &arg, 1));
1442     }
1443   }
1444   return backend_addresses;
1445 }
1446
1447 void GrpcLb::ProcessAddressesAndChannelArgsLocked(
1448     const ServerAddressList& addresses, const grpc_channel_args& args) {
1449   // Update fallback address list.
1450   fallback_backend_addresses_ = ExtractBackendAddresses(addresses);
1451   // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
1452   // since we use this to trigger the client_load_reporting filter.
1453   static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
1454   grpc_arg new_arg = grpc_channel_arg_string_create(
1455       (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"grpclb");
1456   grpc_channel_args_destroy(args_);
1457   args_ = grpc_channel_args_copy_and_add_and_remove(
1458       &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
1459   // Construct args for balancer channel.
1460   ServerAddressList balancer_addresses = ExtractBalancerAddresses(addresses);
1461   grpc_channel_args* lb_channel_args = BuildBalancerChannelArgs(
1462       balancer_addresses, response_generator_.get(), &args);
1463   // Create balancer channel if needed.
1464   if (lb_channel_ == nullptr) {
1465     char* uri_str;
1466     gpr_asprintf(&uri_str, "fake:///%s", server_name_);
1467     lb_channel_ =
1468         channel_control_helper()->CreateChannel(uri_str, *lb_channel_args);
1469     GPR_ASSERT(lb_channel_ != nullptr);
1470     grpc_core::channelz::ChannelNode* channel_node =
1471         grpc_channel_get_channelz_node(lb_channel_);
1472     if (channel_node != nullptr) {
1473       gpr_atm_no_barrier_store(&lb_channel_uuid_, channel_node->uuid());
1474     }
1475     gpr_free(uri_str);
1476   }
1477   // Propagate updates to the LB channel (pick_first) through the fake
1478   // resolver.
1479   Resolver::Result result;
1480   result.addresses = std::move(balancer_addresses);
1481   result.args = lb_channel_args;
1482   response_generator_->SetResponse(std::move(result));
1483 }
1484
1485 void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
1486                                                         grpc_error* error) {
1487   GrpcLb* self = static_cast<GrpcLb*>(arg);
1488   if (!self->shutting_down_ && self->fallback_at_startup_checks_pending_) {
1489     if (self->lb_channel_connectivity_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
1490       // Not in TRANSIENT_FAILURE.  Renew connectivity watch.
1491       grpc_channel_element* client_channel_elem =
1492           grpc_channel_stack_last_element(
1493               grpc_channel_get_channel_stack(self->lb_channel_));
1494       GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1495       grpc_client_channel_watch_connectivity_state(
1496           client_channel_elem,
1497           grpc_polling_entity_create_from_pollset_set(
1498               self->interested_parties()),
1499           &self->lb_channel_connectivity_,
1500           &self->lb_channel_on_connectivity_changed_, nullptr);
1501       return;  // Early out so we don't drop the ref below.
1502     }
1503     // In TRANSIENT_FAILURE.  Cancel the fallback timer and go into
1504     // fallback mode immediately.
1505     gpr_log(GPR_INFO,
1506             "[grpclb %p] balancer channel in state TRANSIENT_FAILURE; "
1507             "entering fallback mode",
1508             self);
1509     self->fallback_at_startup_checks_pending_ = false;
1510     grpc_timer_cancel(&self->lb_fallback_timer_);
1511     self->fallback_mode_ = true;
1512     self->CreateOrUpdateChildPolicyLocked();
1513   }
1514   // Done watching connectivity state, so drop ref.
1515   self->Unref(DEBUG_LOCATION, "watch_lb_channel_connectivity");
1516 }
1517
1518 void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() {
1519   grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
1520       grpc_channel_get_channel_stack(lb_channel_));
1521   GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1522   grpc_client_channel_watch_connectivity_state(
1523       client_channel_elem,
1524       grpc_polling_entity_create_from_pollset_set(interested_parties()),
1525       nullptr, &lb_channel_on_connectivity_changed_, nullptr);
1526 }
1527
1528 //
1529 // code for balancer channel and call
1530 //
1531
1532 void GrpcLb::StartBalancerCallLocked() {
1533   GPR_ASSERT(lb_channel_ != nullptr);
1534   if (shutting_down_) return;
1535   // Init the LB call data.
1536   GPR_ASSERT(lb_calld_ == nullptr);
1537   lb_calld_ = MakeOrphanable<BalancerCallState>(Ref());
1538   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1539     gpr_log(GPR_INFO,
1540             "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p)",
1541             this, lb_channel_, lb_calld_.get());
1542   }
1543   lb_calld_->StartQuery();
1544 }
1545
1546 void GrpcLb::StartBalancerCallRetryTimerLocked() {
1547   grpc_millis next_try = lb_call_backoff_.NextAttemptTime();
1548   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1549     gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this);
1550     grpc_millis timeout = next_try - ExecCtx::Get()->Now();
1551     if (timeout > 0) {
1552       gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRId64 "ms.",
1553               this, timeout);
1554     } else {
1555       gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active immediately.",
1556               this);
1557     }
1558   }
1559   // TODO(roth): We currently track this ref manually.  Once the
1560   // ClosureRef API is ready, we should pass the RefCountedPtr<> along
1561   // with the callback.
1562   auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
1563   self.release();
1564   GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimerLocked,
1565                     this, grpc_combiner_scheduler(combiner()));
1566   retry_timer_callback_pending_ = true;
1567   grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_);
1568 }
1569
1570 void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) {
1571   GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1572   grpclb_policy->retry_timer_callback_pending_ = false;
1573   if (!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE &&
1574       grpclb_policy->lb_calld_ == nullptr) {
1575     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1576       gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server",
1577               grpclb_policy);
1578     }
1579     grpclb_policy->StartBalancerCallLocked();
1580   }
1581   grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
1582 }
1583
1584 //
1585 // code for handling fallback mode
1586 //
1587
1588 void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
1589   // Enter fallback mode if all of the following are true:
1590   // - We are not currently in fallback mode.
1591   // - We are not currently waiting for the initial fallback timeout.
1592   // - We are not currently in contact with the balancer.
1593   // - The child policy is not in state READY.
1594   if (!fallback_mode_ && !fallback_at_startup_checks_pending_ &&
1595       (lb_calld_ == nullptr || !lb_calld_->seen_serverlist()) &&
1596       !child_policy_ready_) {
1597     gpr_log(GPR_INFO,
1598             "[grpclb %p] lost contact with balancer and backends from "
1599             "most recent serverlist; entering fallback mode",
1600             this);
1601     fallback_mode_ = true;
1602     CreateOrUpdateChildPolicyLocked();
1603   }
1604 }
1605
1606 void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
1607   GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1608   // If we receive a serverlist after the timer fires but before this callback
1609   // actually runs, don't fall back.
1610   if (grpclb_policy->fallback_at_startup_checks_pending_ &&
1611       !grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE) {
1612     gpr_log(GPR_INFO,
1613             "[grpclb %p] No response from balancer after fallback timeout; "
1614             "entering fallback mode",
1615             grpclb_policy);
1616     grpclb_policy->fallback_at_startup_checks_pending_ = false;
1617     grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
1618     grpclb_policy->fallback_mode_ = true;
1619     grpclb_policy->CreateOrUpdateChildPolicyLocked();
1620   }
1621   grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
1622 }
1623
1624 //
1625 // code for interacting with the child policy
1626 //
1627
1628 grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked(
1629     bool is_backend_from_grpclb_load_balancer) {
1630   InlinedVector<grpc_arg, 2> args_to_add;
1631   args_to_add.emplace_back(grpc_channel_arg_integer_create(
1632       const_cast<char*>(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER),
1633       is_backend_from_grpclb_load_balancer));
1634   if (is_backend_from_grpclb_load_balancer) {
1635     args_to_add.emplace_back(grpc_channel_arg_integer_create(
1636         const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1));
1637   }
1638   return grpc_channel_args_copy_and_add(args_, args_to_add.data(),
1639                                         args_to_add.size());
1640 }
1641
1642 OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
1643     const char* name, const grpc_channel_args* args) {
1644   Helper* helper = New<Helper>(Ref());
1645   LoadBalancingPolicy::Args lb_policy_args;
1646   lb_policy_args.combiner = combiner();
1647   lb_policy_args.args = args;
1648   lb_policy_args.channel_control_helper =
1649       UniquePtr<ChannelControlHelper>(helper);
1650   OrphanablePtr<LoadBalancingPolicy> lb_policy =
1651       LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
1652           name, std::move(lb_policy_args));
1653   if (GPR_UNLIKELY(lb_policy == nullptr)) {
1654     gpr_log(GPR_ERROR, "[grpclb %p] Failure creating child policy %s", this,
1655             name);
1656     return nullptr;
1657   }
1658   helper->set_child(lb_policy.get());
1659   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1660     gpr_log(GPR_INFO, "[grpclb %p] Created new child policy %s (%p)", this,
1661             name, lb_policy.get());
1662   }
1663   // Add the gRPC LB's interested_parties pollset_set to that of the newly
1664   // created child policy. This will make the child policy progress upon
1665   // activity on gRPC LB, which in turn is tied to the application's call.
1666   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
1667                                    interested_parties());
1668   return lb_policy;
1669 }
1670
1671 void GrpcLb::CreateOrUpdateChildPolicyLocked() {
1672   if (shutting_down_) return;
1673   // Construct update args.
1674   UpdateArgs update_args;
1675   bool is_backend_from_grpclb_load_balancer = false;
1676   if (fallback_mode_) {
1677     // If CreateOrUpdateChildPolicyLocked() is invoked when we haven't
1678     // received any serverlist from the balancer, we use the fallback backends
1679     // returned by the resolver. Note that the fallback backend list may be
1680     // empty, in which case the new round_robin policy will keep the requested
1681     // picks pending.
1682     update_args.addresses = fallback_backend_addresses_;
1683   } else {
1684     update_args.addresses = serverlist_->GetServerAddressList(
1685         lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats());
1686     is_backend_from_grpclb_load_balancer = true;
1687   }
1688   update_args.args =
1689       CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer);
1690   GPR_ASSERT(update_args.args != nullptr);
1691   update_args.config = child_policy_config_;
1692   // If the child policy name changes, we need to create a new child
1693   // policy.  When this happens, we leave child_policy_ as-is and store
1694   // the new child policy in pending_child_policy_.  Once the new child
1695   // policy transitions into state READY, we swap it into child_policy_,
1696   // replacing the original child policy.  So pending_child_policy_ is
1697   // non-null only between when we apply an update that changes the child
1698   // policy name and when the new child reports state READY.
1699   //
1700   // Updates can arrive at any point during this transition.  We always
1701   // apply updates relative to the most recently created child policy,
1702   // even if the most recent one is still in pending_child_policy_.  This
1703   // is true both when applying the updates to an existing child policy
1704   // and when determining whether we need to create a new policy.
1705   //
1706   // As a result of this, there are several cases to consider here:
1707   //
1708   // 1. We have no existing child policy (i.e., we have started up but
1709   //    have not yet received a serverlist from the balancer or gone
1710   //    into fallback mode; in this case, both child_policy_ and
1711   //    pending_child_policy_ are null).  In this case, we create a
1712   //    new child policy and store it in child_policy_.
1713   //
1714   // 2. We have an existing child policy and have no pending child policy
1715   //    from a previous update (i.e., either there has not been a
1716   //    previous update that changed the policy name, or we have already
1717   //    finished swapping in the new policy; in this case, child_policy_
1718   //    is non-null but pending_child_policy_ is null).  In this case:
1719   //    a. If child_policy_->name() equals child_policy_name, then we
1720   //       update the existing child policy.
1721   //    b. If child_policy_->name() does not equal child_policy_name,
1722   //       we create a new policy.  The policy will be stored in
1723   //       pending_child_policy_ and will later be swapped into
1724   //       child_policy_ by the helper when the new child transitions
1725   //       into state READY.
1726   //
1727   // 3. We have an existing child policy and have a pending child policy
1728   //    from a previous update (i.e., a previous update set
1729   //    pending_child_policy_ as per case 2b above and that policy has
1730   //    not yet transitioned into state READY and been swapped into
1731   //    child_policy_; in this case, both child_policy_ and
1732   //    pending_child_policy_ are non-null).  In this case:
1733   //    a. If pending_child_policy_->name() equals child_policy_name,
1734   //       then we update the existing pending child policy.
1735   //    b. If pending_child_policy->name() does not equal
1736   //       child_policy_name, then we create a new policy.  The new
1737   //       policy is stored in pending_child_policy_ (replacing the one
1738   //       that was there before, which will be immediately shut down)
1739   //       and will later be swapped into child_policy_ by the helper
1740   //       when the new child transitions into state READY.
1741   const char* child_policy_name = child_policy_config_ == nullptr
1742                                       ? "round_robin"
1743                                       : child_policy_config_->name();
1744   const bool create_policy =
1745       // case 1
1746       child_policy_ == nullptr ||
1747       // case 2b
1748       (pending_child_policy_ == nullptr &&
1749        strcmp(child_policy_->name(), child_policy_name) != 0) ||
1750       // case 3b
1751       (pending_child_policy_ != nullptr &&
1752        strcmp(pending_child_policy_->name(), child_policy_name) != 0);
1753   LoadBalancingPolicy* policy_to_update = nullptr;
1754   if (create_policy) {
1755     // Cases 1, 2b, and 3b: create a new child policy.
1756     // If child_policy_ is null, we set it (case 1), else we set
1757     // pending_child_policy_ (cases 2b and 3b).
1758     if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1759       gpr_log(GPR_INFO, "[grpclb %p] Creating new %schild policy %s", this,
1760               child_policy_ == nullptr ? "" : "pending ", child_policy_name);
1761     }
1762     auto new_policy =
1763         CreateChildPolicyLocked(child_policy_name, update_args.args);
1764     // Swap the policy into place.
1765     auto& lb_policy =
1766         child_policy_ == nullptr ? child_policy_ : pending_child_policy_;
1767     {
1768       MutexLock lock(&child_policy_mu_);
1769       lb_policy = std::move(new_policy);
1770     }
1771     policy_to_update = lb_policy.get();
1772   } else {
1773     // Cases 2a and 3a: update an existing policy.
1774     // If we have a pending child policy, send the update to the pending
1775     // policy (case 3a), else send it to the current policy (case 2a).
1776     policy_to_update = pending_child_policy_ != nullptr
1777                            ? pending_child_policy_.get()
1778                            : child_policy_.get();
1779   }
1780   GPR_ASSERT(policy_to_update != nullptr);
1781   // Update the policy.
1782   if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_glb_trace)) {
1783     gpr_log(GPR_INFO, "[grpclb %p] Updating %schild policy %p", this,
1784             policy_to_update == pending_child_policy_.get() ? "pending " : "",
1785             policy_to_update);
1786   }
1787   policy_to_update->UpdateLocked(std::move(update_args));
1788 }
1789
1790 //
1791 // factory
1792 //
1793
1794 class GrpcLbFactory : public LoadBalancingPolicyFactory {
1795  public:
1796   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1797       LoadBalancingPolicy::Args args) const override {
1798     return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(std::move(args)));
1799   }
1800
1801   const char* name() const override { return kGrpclb; }
1802
1803   RefCountedPtr<ParsedLoadBalancingConfig> ParseLoadBalancingConfig(
1804       const grpc_json* json, grpc_error** error) const override {
1805     GPR_DEBUG_ASSERT(error != nullptr && *error == GRPC_ERROR_NONE);
1806     if (json == nullptr) {
1807       return RefCountedPtr<ParsedLoadBalancingConfig>(
1808           New<ParsedGrpcLbConfig>(nullptr));
1809     }
1810     InlinedVector<grpc_error*, 2> error_list;
1811     RefCountedPtr<ParsedLoadBalancingConfig> child_policy;
1812     for (const grpc_json* field = json->child; field != nullptr;
1813          field = field->next) {
1814       if (field->key == nullptr) continue;
1815       if (strcmp(field->key, "childPolicy") == 0) {
1816         if (child_policy != nullptr) {
1817           error_list.push_back(GRPC_ERROR_CREATE_FROM_STATIC_STRING(
1818               "field:childPolicy error:Duplicate entry"));
1819         }
1820         grpc_error* parse_error = GRPC_ERROR_NONE;
1821         child_policy = LoadBalancingPolicyRegistry::ParseLoadBalancingConfig(
1822             field, &parse_error);
1823         if (parse_error != GRPC_ERROR_NONE) {
1824           error_list.push_back(parse_error);
1825         }
1826       }
1827     }
1828     if (error_list.empty()) {
1829       return RefCountedPtr<ParsedLoadBalancingConfig>(
1830           New<ParsedGrpcLbConfig>(std::move(child_policy)));
1831     } else {
1832       *error = GRPC_ERROR_CREATE_FROM_VECTOR("GrpcLb Parser", &error_list);
1833       return nullptr;
1834     }
1835   }
1836 };
1837
1838 }  // namespace
1839
1840 }  // namespace grpc_core
1841
1842 //
1843 // Plugin registration
1844 //
1845
1846 namespace {
1847
1848 // Only add client_load_reporting filter if the grpclb LB policy is used.
1849 bool maybe_add_client_load_reporting_filter(grpc_channel_stack_builder* builder,
1850                                             void* arg) {
1851   const grpc_channel_args* args =
1852       grpc_channel_stack_builder_get_channel_arguments(builder);
1853   const grpc_arg* channel_arg =
1854       grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME);
1855   if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_STRING &&
1856       strcmp(channel_arg->value.string, "grpclb") == 0) {
1857     return grpc_channel_stack_builder_append_filter(
1858         builder, (const grpc_channel_filter*)arg, nullptr, nullptr);
1859   }
1860   return true;
1861 }
1862
1863 }  // namespace
1864
1865 void grpc_lb_policy_grpclb_init() {
1866   grpc_core::LoadBalancingPolicyRegistry::Builder::
1867       RegisterLoadBalancingPolicyFactory(
1868           grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
1869               grpc_core::New<grpc_core::GrpcLbFactory>()));
1870   grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
1871                                    GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
1872                                    maybe_add_client_load_reporting_filter,
1873                                    (void*)&grpc_client_load_reporting_filter);
1874 }
1875
1876 void grpc_lb_policy_grpclb_shutdown() {}