02fe06c4557d97bdf0562238db1260c84965baf9
[platform/upstream/grpc.git] / src / core / ext / filters / client_channel / lb_policy / grpclb / grpclb.cc
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 /// Implementation of the gRPC LB policy.
20 ///
21 /// This policy takes as input a list of resolved addresses, which must
22 /// include at least one balancer address.
23 ///
24 /// An internal channel (\a lb_channel_) is created for the addresses
25 /// from that are balancers.  This channel behaves just like a regular
26 /// channel that uses pick_first to select from the list of balancer
27 /// addresses.
28 ///
29 /// When we get our initial update, we instantiate the internal *streaming*
30 /// call to the LB server (whichever address pick_first chose).  The call
31 /// will be complete when either the balancer sends status or when we cancel
32 /// the call (e.g., because we are shutting down).  In needed, we retry the
33 /// call.  If we received at least one valid message from the server, a new
34 /// call attempt will be made immediately; otherwise, we apply back-off
35 /// delays between attempts.
36 ///
37 /// We maintain an internal round_robin policy instance for distributing
38 /// requests across backends.  Whenever we receive a new serverlist from
39 /// the balancer, we update the round_robin policy with the new list of
40 /// addresses.  If we cannot communicate with the balancer on startup,
41 /// however, we may enter fallback mode, in which case we will populate
42 /// the child policy's addresses from the backend addresses returned by the
43 /// resolver.
44 ///
45 /// Once a child policy instance is in place (and getting updated as described),
46 /// calls for a pick, a ping, or a cancellation will be serviced right
47 /// away by forwarding them to the child policy instance.  Any time there's no
48 /// child policy available (i.e., right after the creation of the gRPCLB
49 /// policy), pick requests are queued.
50 ///
51 /// \see https://github.com/grpc/grpc/blob/master/doc/load-balancing.md for the
52 /// high level design and details.
53
54 // With the addition of a libuv endpoint, sockaddr.h now includes uv.h when
55 // using that endpoint. Because of various transitive includes in uv.h,
56 // including windows.h on Windows, uv.h must be included before other system
57 // headers. Therefore, sockaddr.h must always be included first.
58 #include <grpc/support/port_platform.h>
59
60 #include "src/core/lib/iomgr/sockaddr.h"
61 #include "src/core/lib/iomgr/socket_utils.h"
62
63 #include <inttypes.h>
64 #include <limits.h>
65 #include <string.h>
66
67 #include <grpc/byte_buffer_reader.h>
68 #include <grpc/grpc.h>
69 #include <grpc/support/alloc.h>
70 #include <grpc/support/string_util.h>
71 #include <grpc/support/time.h>
72
73 #include "src/core/ext/filters/client_channel/client_channel.h"
74 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/client_load_reporting_filter.h"
75 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.h"
76 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_channel.h"
77 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_client_stats.h"
78 #include "src/core/ext/filters/client_channel/lb_policy/grpclb/load_balancer_api.h"
79 #include "src/core/ext/filters/client_channel/lb_policy_factory.h"
80 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
81 #include "src/core/ext/filters/client_channel/parse_address.h"
82 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
83 #include "src/core/ext/filters/client_channel/server_address.h"
84 #include "src/core/lib/backoff/backoff.h"
85 #include "src/core/lib/channel/channel_args.h"
86 #include "src/core/lib/channel/channel_stack.h"
87 #include "src/core/lib/gpr/host_port.h"
88 #include "src/core/lib/gpr/string.h"
89 #include "src/core/lib/gprpp/manual_constructor.h"
90 #include "src/core/lib/gprpp/memory.h"
91 #include "src/core/lib/gprpp/mutex_lock.h"
92 #include "src/core/lib/gprpp/orphanable.h"
93 #include "src/core/lib/gprpp/ref_counted_ptr.h"
94 #include "src/core/lib/iomgr/combiner.h"
95 #include "src/core/lib/iomgr/sockaddr.h"
96 #include "src/core/lib/iomgr/sockaddr_utils.h"
97 #include "src/core/lib/iomgr/timer.h"
98 #include "src/core/lib/slice/slice_hash_table.h"
99 #include "src/core/lib/slice/slice_internal.h"
100 #include "src/core/lib/slice/slice_string_helpers.h"
101 #include "src/core/lib/surface/call.h"
102 #include "src/core/lib/surface/channel.h"
103 #include "src/core/lib/surface/channel_init.h"
104 #include "src/core/lib/transport/static_metadata.h"
105
106 #define GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS 1
107 #define GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER 1.6
108 #define GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS 120
109 #define GRPC_GRPCLB_RECONNECT_JITTER 0.2
110 #define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
111
112 #define GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN "grpc.grpclb_address_lb_token"
113
114 namespace grpc_core {
115
116 TraceFlag grpc_lb_glb_trace(false, "glb");
117
118 namespace {
119
120 constexpr char kGrpclb[] = "grpclb";
121
122 class GrpcLb : public LoadBalancingPolicy {
123  public:
124   explicit GrpcLb(Args args);
125
126   const char* name() const override { return kGrpclb; }
127
128   void UpdateLocked(UpdateArgs args) override;
129   void ResetBackoffLocked() override;
130   void FillChildRefsForChannelz(
131       channelz::ChildRefsList* child_subchannels,
132       channelz::ChildRefsList* child_channels) override;
133
134  private:
135   /// Contains a call to the LB server and all the data related to the call.
136   class BalancerCallState : public InternallyRefCounted<BalancerCallState> {
137    public:
138     explicit BalancerCallState(
139         RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy);
140
141     // It's the caller's responsibility to ensure that Orphan() is called from
142     // inside the combiner.
143     void Orphan() override;
144
145     void StartQuery();
146
147     GrpcLbClientStats* client_stats() const { return client_stats_.get(); }
148
149     bool seen_initial_response() const { return seen_initial_response_; }
150     bool seen_serverlist() const { return seen_serverlist_; }
151
152    private:
153     // So Delete() can access our private dtor.
154     template <typename T>
155     friend void grpc_core::Delete(T*);
156
157     ~BalancerCallState();
158
159     GrpcLb* grpclb_policy() const {
160       return static_cast<GrpcLb*>(grpclb_policy_.get());
161     }
162
163     void ScheduleNextClientLoadReportLocked();
164     void SendClientLoadReportLocked();
165
166     static bool LoadReportCountersAreZero(grpc_grpclb_request* request);
167
168     static void MaybeSendClientLoadReportLocked(void* arg, grpc_error* error);
169     static void ClientLoadReportDoneLocked(void* arg, grpc_error* error);
170     static void OnInitialRequestSentLocked(void* arg, grpc_error* error);
171     static void OnBalancerMessageReceivedLocked(void* arg, grpc_error* error);
172     static void OnBalancerStatusReceivedLocked(void* arg, grpc_error* error);
173
174     // The owning LB policy.
175     RefCountedPtr<LoadBalancingPolicy> grpclb_policy_;
176
177     // The streaming call to the LB server. Always non-NULL.
178     grpc_call* lb_call_ = nullptr;
179
180     // recv_initial_metadata
181     grpc_metadata_array lb_initial_metadata_recv_;
182
183     // send_message
184     grpc_byte_buffer* send_message_payload_ = nullptr;
185     grpc_closure lb_on_initial_request_sent_;
186
187     // recv_message
188     grpc_byte_buffer* recv_message_payload_ = nullptr;
189     grpc_closure lb_on_balancer_message_received_;
190     bool seen_initial_response_ = false;
191     bool seen_serverlist_ = false;
192
193     // recv_trailing_metadata
194     grpc_closure lb_on_balancer_status_received_;
195     grpc_metadata_array lb_trailing_metadata_recv_;
196     grpc_status_code lb_call_status_;
197     grpc_slice lb_call_status_details_;
198
199     // The stats for client-side load reporting associated with this LB call.
200     // Created after the first serverlist is received.
201     RefCountedPtr<GrpcLbClientStats> client_stats_;
202     grpc_millis client_stats_report_interval_ = 0;
203     grpc_timer client_load_report_timer_;
204     bool client_load_report_timer_callback_pending_ = false;
205     bool last_client_load_report_counters_were_zero_ = false;
206     bool client_load_report_is_due_ = false;
207     // The closure used for either the load report timer or the callback for
208     // completion of sending the load report.
209     grpc_closure client_load_report_closure_;
210   };
211
212   class Serverlist : public RefCounted<Serverlist> {
213    public:
214     // Takes ownership of serverlist.
215     explicit Serverlist(grpc_grpclb_serverlist* serverlist)
216         : serverlist_(serverlist) {}
217
218     ~Serverlist() { grpc_grpclb_destroy_serverlist(serverlist_); }
219
220     bool operator==(const Serverlist& other) const;
221
222     const grpc_grpclb_serverlist* serverlist() const { return serverlist_; }
223
224     // Returns a text representation suitable for logging.
225     UniquePtr<char> AsText() const;
226
227     // Extracts all non-drop entries into a ServerAddressList.
228     ServerAddressList GetServerAddressList(
229         GrpcLbClientStats* client_stats) const;
230
231     // Returns true if the serverlist contains at least one drop entry and
232     // no backend address entries.
233     bool ContainsAllDropEntries() const;
234
235     // Returns the LB token to use for a drop, or null if the call
236     // should not be dropped.
237     // Intended to be called from picker, so calls will be externally
238     // synchronized.
239     const char* ShouldDrop();
240
241    private:
242     grpc_grpclb_serverlist* serverlist_;
243     size_t drop_index_ = 0;
244   };
245
246   class Picker : public SubchannelPicker {
247    public:
248     Picker(GrpcLb* parent, RefCountedPtr<Serverlist> serverlist,
249            UniquePtr<SubchannelPicker> child_picker,
250            RefCountedPtr<GrpcLbClientStats> client_stats)
251         : parent_(parent),
252           serverlist_(std::move(serverlist)),
253           child_picker_(std::move(child_picker)),
254           client_stats_(std::move(client_stats)) {}
255
256     PickResult Pick(PickArgs* pick, grpc_error** error) override;
257
258    private:
259     // Storing the address for logging, but not holding a ref.
260     // DO NOT DEFERENCE!
261     GrpcLb* parent_;
262
263     // Serverlist to be used for determining drops.
264     RefCountedPtr<Serverlist> serverlist_;
265
266     UniquePtr<SubchannelPicker> child_picker_;
267     RefCountedPtr<GrpcLbClientStats> client_stats_;
268   };
269
270   class Helper : public ChannelControlHelper {
271    public:
272     explicit Helper(RefCountedPtr<GrpcLb> parent)
273         : parent_(std::move(parent)) {}
274
275     Subchannel* CreateSubchannel(const grpc_channel_args& args) override;
276     grpc_channel* CreateChannel(const char* target,
277                                 const grpc_channel_args& args) override;
278     void UpdateState(grpc_connectivity_state state, grpc_error* state_error,
279                      UniquePtr<SubchannelPicker> picker) override;
280     void RequestReresolution() override;
281
282     void set_child(LoadBalancingPolicy* child) { child_ = child; }
283
284    private:
285     bool CalledByPendingChild() const;
286     bool CalledByCurrentChild() const;
287
288     RefCountedPtr<GrpcLb> parent_;
289     LoadBalancingPolicy* child_ = nullptr;
290   };
291
292   ~GrpcLb();
293
294   void ShutdownLocked() override;
295
296   // Helper functions used in UpdateLocked().
297   void ProcessAddressesAndChannelArgsLocked(const ServerAddressList& addresses,
298                                             const grpc_channel_args& args);
299   void ParseLbConfig(Config* grpclb_config);
300   static void OnBalancerChannelConnectivityChangedLocked(void* arg,
301                                                          grpc_error* error);
302   void CancelBalancerChannelConnectivityWatchLocked();
303
304   // Methods for dealing with fallback state.
305   void MaybeEnterFallbackModeAfterStartup();
306   static void OnFallbackTimerLocked(void* arg, grpc_error* error);
307
308   // Methods for dealing with the balancer call.
309   void StartBalancerCallLocked();
310   void StartBalancerCallRetryTimerLocked();
311   static void OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error);
312
313   // Methods for dealing with the child policy.
314   grpc_channel_args* CreateChildPolicyArgsLocked(
315       bool is_backend_from_grpclb_load_balancer);
316   OrphanablePtr<LoadBalancingPolicy> CreateChildPolicyLocked(
317       const char* name, const grpc_channel_args* args);
318   void CreateOrUpdateChildPolicyLocked();
319
320   // Who the client is trying to communicate with.
321   const char* server_name_ = nullptr;
322
323   // Current channel args from the resolver.
324   grpc_channel_args* args_ = nullptr;
325
326   // Internal state.
327   bool shutting_down_ = false;
328
329   // The channel for communicating with the LB server.
330   grpc_channel* lb_channel_ = nullptr;
331   // Uuid of the lb channel. Used for channelz.
332   gpr_atm lb_channel_uuid_ = 0;
333   // Response generator to inject address updates into lb_channel_.
334   RefCountedPtr<FakeResolverResponseGenerator> response_generator_;
335
336   // The data associated with the current LB call. It holds a ref to this LB
337   // policy. It's initialized every time we query for backends. It's reset to
338   // NULL whenever the current LB call is no longer needed (e.g., the LB policy
339   // is shutting down, or the LB call has ended). A non-NULL lb_calld_ always
340   // contains a non-NULL lb_call_.
341   OrphanablePtr<BalancerCallState> lb_calld_;
342   // Timeout in milliseconds for the LB call. 0 means no deadline.
343   int lb_call_timeout_ms_ = 0;
344   // Balancer call retry state.
345   BackOff lb_call_backoff_;
346   bool retry_timer_callback_pending_ = false;
347   grpc_timer lb_call_retry_timer_;
348   grpc_closure lb_on_call_retry_;
349
350   // The deserialized response from the balancer. May be nullptr until one
351   // such response has arrived.
352   RefCountedPtr<Serverlist> serverlist_;
353
354   // Whether we're in fallback mode.
355   bool fallback_mode_ = false;
356   // The backend addresses from the resolver.
357   ServerAddressList fallback_backend_addresses_;
358   // State for fallback-at-startup checks.
359   // Timeout after startup after which we will go into fallback mode if
360   // we have not received a serverlist from the balancer.
361   int fallback_at_startup_timeout_ = 0;
362   bool fallback_at_startup_checks_pending_ = false;
363   grpc_timer lb_fallback_timer_;
364   grpc_closure lb_on_fallback_;
365   grpc_connectivity_state lb_channel_connectivity_ = GRPC_CHANNEL_IDLE;
366   grpc_closure lb_channel_on_connectivity_changed_;
367
368   // Lock held when modifying the value of child_policy_ or
369   // pending_child_policy_.
370   gpr_mu child_policy_mu_;
371   // The child policy to use for the backends.
372   OrphanablePtr<LoadBalancingPolicy> child_policy_;
373   // When switching child policies, the new policy will be stored here
374   // until it reports READY, at which point it will be moved to child_policy_.
375   OrphanablePtr<LoadBalancingPolicy> pending_child_policy_;
376   // The child policy config.
377   RefCountedPtr<Config> child_policy_config_;
378   // Child policy in state READY.
379   bool child_policy_ready_ = false;
380 };
381
382 //
383 // GrpcLb::Serverlist
384 //
385
386 bool GrpcLb::Serverlist::operator==(const Serverlist& other) const {
387   return grpc_grpclb_serverlist_equals(serverlist_, other.serverlist_);
388 }
389
390 void ParseServer(const grpc_grpclb_server* server,
391                  grpc_resolved_address* addr) {
392   memset(addr, 0, sizeof(*addr));
393   if (server->drop) return;
394   const uint16_t netorder_port = grpc_htons((uint16_t)server->port);
395   /* the addresses are given in binary format (a in(6)_addr struct) in
396    * server->ip_address.bytes. */
397   const grpc_grpclb_ip_address* ip = &server->ip_address;
398   if (ip->size == 4) {
399     addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in));
400     grpc_sockaddr_in* addr4 = reinterpret_cast<grpc_sockaddr_in*>(&addr->addr);
401     addr4->sin_family = GRPC_AF_INET;
402     memcpy(&addr4->sin_addr, ip->bytes, ip->size);
403     addr4->sin_port = netorder_port;
404   } else if (ip->size == 16) {
405     addr->len = static_cast<socklen_t>(sizeof(grpc_sockaddr_in6));
406     grpc_sockaddr_in6* addr6 = (grpc_sockaddr_in6*)&addr->addr;
407     addr6->sin6_family = GRPC_AF_INET6;
408     memcpy(&addr6->sin6_addr, ip->bytes, ip->size);
409     addr6->sin6_port = netorder_port;
410   }
411 }
412
413 UniquePtr<char> GrpcLb::Serverlist::AsText() const {
414   gpr_strvec entries;
415   gpr_strvec_init(&entries);
416   for (size_t i = 0; i < serverlist_->num_servers; ++i) {
417     const auto* server = serverlist_->servers[i];
418     char* ipport;
419     if (server->drop) {
420       ipport = gpr_strdup("(drop)");
421     } else {
422       grpc_resolved_address addr;
423       ParseServer(server, &addr);
424       grpc_sockaddr_to_string(&ipport, &addr, false);
425     }
426     char* entry;
427     gpr_asprintf(&entry, "  %" PRIuPTR ": %s token=%s\n", i, ipport,
428                  server->load_balance_token);
429     gpr_free(ipport);
430     gpr_strvec_add(&entries, entry);
431   }
432   UniquePtr<char> result(gpr_strvec_flatten(&entries, nullptr));
433   gpr_strvec_destroy(&entries);
434   return result;
435 }
436
437 // vtable for LB token channel arg.
438 void* lb_token_copy(void* token) {
439   return token == nullptr
440              ? nullptr
441              : (void*)GRPC_MDELEM_REF(grpc_mdelem{(uintptr_t)token}).payload;
442 }
443 void lb_token_destroy(void* token) {
444   if (token != nullptr) {
445     GRPC_MDELEM_UNREF(grpc_mdelem{(uintptr_t)token});
446   }
447 }
448 int lb_token_cmp(void* token1, void* token2) {
449   // Always indicate a match, since we don't want this channel arg to
450   // affect the subchannel's key in the index.
451   return 0;
452 }
453 const grpc_arg_pointer_vtable lb_token_arg_vtable = {
454     lb_token_copy, lb_token_destroy, lb_token_cmp};
455
456 bool IsServerValid(const grpc_grpclb_server* server, size_t idx, bool log) {
457   if (server->drop) return false;
458   const grpc_grpclb_ip_address* ip = &server->ip_address;
459   if (GPR_UNLIKELY(server->port >> 16 != 0)) {
460     if (log) {
461       gpr_log(GPR_ERROR,
462               "Invalid port '%d' at index %lu of serverlist. Ignoring.",
463               server->port, (unsigned long)idx);
464     }
465     return false;
466   }
467   if (GPR_UNLIKELY(ip->size != 4 && ip->size != 16)) {
468     if (log) {
469       gpr_log(GPR_ERROR,
470               "Expected IP to be 4 or 16 bytes, got %d at index %lu of "
471               "serverlist. Ignoring",
472               ip->size, (unsigned long)idx);
473     }
474     return false;
475   }
476   return true;
477 }
478
479 // Returns addresses extracted from the serverlist.
480 ServerAddressList GrpcLb::Serverlist::GetServerAddressList(
481     GrpcLbClientStats* client_stats) const {
482   ServerAddressList addresses;
483   for (size_t i = 0; i < serverlist_->num_servers; ++i) {
484     const grpc_grpclb_server* server = serverlist_->servers[i];
485     if (!IsServerValid(serverlist_->servers[i], i, false)) continue;
486     // Address processing.
487     grpc_resolved_address addr;
488     ParseServer(server, &addr);
489     // LB token processing.
490     grpc_mdelem lb_token;
491     if (server->has_load_balance_token) {
492       const size_t lb_token_max_length =
493           GPR_ARRAY_SIZE(server->load_balance_token);
494       const size_t lb_token_length =
495           strnlen(server->load_balance_token, lb_token_max_length);
496       grpc_slice lb_token_mdstr = grpc_slice_from_copied_buffer(
497           server->load_balance_token, lb_token_length);
498       lb_token = grpc_mdelem_from_slices(GRPC_MDSTR_LB_TOKEN, lb_token_mdstr);
499       if (client_stats != nullptr) {
500         GPR_ASSERT(grpc_mdelem_set_user_data(
501                        lb_token, GrpcLbClientStats::Destroy,
502                        client_stats->Ref().release()) == client_stats);
503       }
504     } else {
505       char* uri = grpc_sockaddr_to_uri(&addr);
506       gpr_log(GPR_INFO,
507               "Missing LB token for backend address '%s'. The empty token will "
508               "be used instead",
509               uri);
510       gpr_free(uri);
511       lb_token = GRPC_MDELEM_LB_TOKEN_EMPTY;
512     }
513     // Add address.
514     grpc_arg arg = grpc_channel_arg_pointer_create(
515         const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN),
516         (void*)lb_token.payload, &lb_token_arg_vtable);
517     grpc_channel_args* args = grpc_channel_args_copy_and_add(nullptr, &arg, 1);
518     addresses.emplace_back(addr, args);
519     // Clean up.
520     GRPC_MDELEM_UNREF(lb_token);
521   }
522   return addresses;
523 }
524
525 bool GrpcLb::Serverlist::ContainsAllDropEntries() const {
526   if (serverlist_->num_servers == 0) return false;
527   for (size_t i = 0; i < serverlist_->num_servers; ++i) {
528     if (!serverlist_->servers[i]->drop) return false;
529   }
530   return true;
531 }
532
533 const char* GrpcLb::Serverlist::ShouldDrop() {
534   if (serverlist_->num_servers == 0) return nullptr;
535   grpc_grpclb_server* server = serverlist_->servers[drop_index_];
536   drop_index_ = (drop_index_ + 1) % serverlist_->num_servers;
537   return server->drop ? server->load_balance_token : nullptr;
538 }
539
540 //
541 // GrpcLb::Picker
542 //
543
544 GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs* pick, grpc_error** error) {
545   // Check if we should drop the call.
546   const char* drop_token = serverlist_->ShouldDrop();
547   if (drop_token != nullptr) {
548     // Update client load reporting stats to indicate the number of
549     // dropped calls.  Note that we have to do this here instead of in
550     // the client_load_reporting filter, because we do not create a
551     // subchannel call (and therefore no client_load_reporting filter)
552     // for dropped calls.
553     if (client_stats_ != nullptr) {
554       client_stats_->AddCallDroppedLocked(drop_token);
555     }
556     return PICK_COMPLETE;
557   }
558   // Forward pick to child policy.
559   PickResult result = child_picker_->Pick(pick, error);
560   // If pick succeeded, add LB token to initial metadata.
561   if (result == PickResult::PICK_COMPLETE &&
562       pick->connected_subchannel != nullptr) {
563     const grpc_arg* arg = grpc_channel_args_find(
564         pick->connected_subchannel->args(), GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN);
565     if (arg == nullptr) {
566       gpr_log(GPR_ERROR,
567               "[grpclb %p picker %p] No LB token for connected subchannel "
568               "pick %p",
569               parent_, this, pick);
570       abort();
571     }
572     grpc_mdelem lb_token = {reinterpret_cast<uintptr_t>(arg->value.pointer.p)};
573     GPR_ASSERT(!GRPC_MDISNULL(lb_token));
574     GPR_ASSERT(grpc_metadata_batch_add_tail(
575                    pick->initial_metadata, &pick->lb_token_mdelem_storage,
576                    GRPC_MDELEM_REF(lb_token)) == GRPC_ERROR_NONE);
577     GrpcLbClientStats* client_stats = static_cast<GrpcLbClientStats*>(
578         grpc_mdelem_get_user_data(lb_token, GrpcLbClientStats::Destroy));
579     if (client_stats != nullptr) {
580       client_stats->AddCallStarted();
581     }
582   }
583   return result;
584 }
585
586 //
587 // GrpcLb::Helper
588 //
589
590 bool GrpcLb::Helper::CalledByPendingChild() const {
591   GPR_ASSERT(child_ != nullptr);
592   return child_ == parent_->pending_child_policy_.get();
593 }
594
595 bool GrpcLb::Helper::CalledByCurrentChild() const {
596   GPR_ASSERT(child_ != nullptr);
597   return child_ == parent_->child_policy_.get();
598 }
599
600 Subchannel* GrpcLb::Helper::CreateSubchannel(const grpc_channel_args& args) {
601   if (parent_->shutting_down_ ||
602       (!CalledByPendingChild() && !CalledByCurrentChild())) {
603     return nullptr;
604   }
605   return parent_->channel_control_helper()->CreateSubchannel(args);
606 }
607
608 grpc_channel* GrpcLb::Helper::CreateChannel(const char* target,
609                                             const grpc_channel_args& args) {
610   if (parent_->shutting_down_ ||
611       (!CalledByPendingChild() && !CalledByCurrentChild())) {
612     return nullptr;
613   }
614   return parent_->channel_control_helper()->CreateChannel(target, args);
615 }
616
617 void GrpcLb::Helper::UpdateState(grpc_connectivity_state state,
618                                  grpc_error* state_error,
619                                  UniquePtr<SubchannelPicker> picker) {
620   if (parent_->shutting_down_) {
621     GRPC_ERROR_UNREF(state_error);
622     return;
623   }
624   // If this request is from the pending child policy, ignore it until
625   // it reports READY, at which point we swap it into place.
626   if (CalledByPendingChild()) {
627     if (grpc_lb_glb_trace.enabled()) {
628       gpr_log(GPR_INFO,
629               "[grpclb %p helper %p] pending child policy %p reports state=%s",
630               parent_.get(), this, parent_->pending_child_policy_.get(),
631               grpc_connectivity_state_name(state));
632     }
633     if (state != GRPC_CHANNEL_READY) {
634       GRPC_ERROR_UNREF(state_error);
635       return;
636     }
637     grpc_pollset_set_del_pollset_set(
638         parent_->child_policy_->interested_parties(),
639         parent_->interested_parties());
640     MutexLock lock(&parent_->child_policy_mu_);
641     parent_->child_policy_ = std::move(parent_->pending_child_policy_);
642   } else if (!CalledByCurrentChild()) {
643     // This request is from an outdated child, so ignore it.
644     GRPC_ERROR_UNREF(state_error);
645     return;
646   }
647   // Record whether child policy reports READY.
648   parent_->child_policy_ready_ = state == GRPC_CHANNEL_READY;
649   // Enter fallback mode if needed.
650   parent_->MaybeEnterFallbackModeAfterStartup();
651   // There are three cases to consider here:
652   // 1. We're in fallback mode.  In this case, we're always going to use
653   //    the child policy's result, so we pass its picker through as-is.
654   // 2. The serverlist contains only drop entries.  In this case, we
655   //    want to use our own picker so that we can return the drops.
656   // 3. Not in fallback mode and serverlist is not all drops (i.e., it
657   //    may be empty or contain at least one backend address).  There are
658   //    two sub-cases:
659   //    a. The child policy is reporting state READY.  In this case, we wrap
660   //       the child's picker in our own, so that we can handle drops and LB
661   //       token metadata for each pick.
662   //    b. The child policy is reporting a state other than READY.  In this
663   //       case, we don't want to use our own picker, because we don't want
664   //       to process drops for picks that yield a QUEUE result; this would
665   //       result in dropping too many calls, since we will see the
666   //       queued picks multiple times, and we'd consider each one a
667   //       separate call for the drop calculation.
668   //
669   // Cases 1 and 3b: return picker from the child policy as-is.
670   if (parent_->serverlist_ == nullptr ||
671       (!parent_->serverlist_->ContainsAllDropEntries() &&
672        state != GRPC_CHANNEL_READY)) {
673     if (grpc_lb_glb_trace.enabled()) {
674       gpr_log(GPR_INFO,
675               "[grpclb %p helper %p] state=%s passing child picker %p as-is",
676               parent_.get(), this, grpc_connectivity_state_name(state),
677               picker.get());
678     }
679     parent_->channel_control_helper()->UpdateState(state, state_error,
680                                                    std::move(picker));
681     return;
682   }
683   // Cases 2 and 3a: wrap picker from the child in our own picker.
684   if (grpc_lb_glb_trace.enabled()) {
685     gpr_log(GPR_INFO, "[grpclb %p helper %p] state=%s wrapping child picker %p",
686             parent_.get(), this, grpc_connectivity_state_name(state),
687             picker.get());
688   }
689   RefCountedPtr<GrpcLbClientStats> client_stats;
690   if (parent_->lb_calld_ != nullptr &&
691       parent_->lb_calld_->client_stats() != nullptr) {
692     client_stats = parent_->lb_calld_->client_stats()->Ref();
693   }
694   parent_->channel_control_helper()->UpdateState(
695       state, state_error,
696       UniquePtr<SubchannelPicker>(
697           New<Picker>(parent_.get(), parent_->serverlist_, std::move(picker),
698                       std::move(client_stats))));
699 }
700
701 void GrpcLb::Helper::RequestReresolution() {
702   if (parent_->shutting_down_) return;
703   const LoadBalancingPolicy* latest_child_policy =
704       parent_->pending_child_policy_ != nullptr
705           ? parent_->pending_child_policy_.get()
706           : parent_->child_policy_.get();
707   if (child_ != latest_child_policy) return;
708   if (grpc_lb_glb_trace.enabled()) {
709     gpr_log(GPR_INFO,
710             "[grpclb %p] Re-resolution requested from %schild policy (%p).",
711             parent_.get(), CalledByPendingChild() ? "pending " : "", child_);
712   }
713   // If we are talking to a balancer, we expect to get updated addresses
714   // from the balancer, so we can ignore the re-resolution request from
715   // the child policy. Otherwise, pass the re-resolution request up to the
716   // channel.
717   if (parent_->lb_calld_ == nullptr ||
718       !parent_->lb_calld_->seen_initial_response()) {
719     parent_->channel_control_helper()->RequestReresolution();
720   }
721 }
722
723 //
724 // GrpcLb::BalancerCallState
725 //
726
727 GrpcLb::BalancerCallState::BalancerCallState(
728     RefCountedPtr<LoadBalancingPolicy> parent_grpclb_policy)
729     : InternallyRefCounted<BalancerCallState>(&grpc_lb_glb_trace),
730       grpclb_policy_(std::move(parent_grpclb_policy)) {
731   GPR_ASSERT(grpclb_policy_ != nullptr);
732   GPR_ASSERT(!grpclb_policy()->shutting_down_);
733   // Init the LB call. Note that the LB call will progress every time there's
734   // activity in grpclb_policy_->interested_parties(), which is comprised of
735   // the polling entities from client_channel.
736   GPR_ASSERT(grpclb_policy()->server_name_ != nullptr);
737   GPR_ASSERT(grpclb_policy()->server_name_[0] != '\0');
738   const grpc_millis deadline =
739       grpclb_policy()->lb_call_timeout_ms_ == 0
740           ? GRPC_MILLIS_INF_FUTURE
741           : ExecCtx::Get()->Now() + grpclb_policy()->lb_call_timeout_ms_;
742   lb_call_ = grpc_channel_create_pollset_set_call(
743       grpclb_policy()->lb_channel_, nullptr, GRPC_PROPAGATE_DEFAULTS,
744       grpclb_policy_->interested_parties(),
745       GRPC_MDSTR_SLASH_GRPC_DOT_LB_DOT_V1_DOT_LOADBALANCER_SLASH_BALANCELOAD,
746       nullptr, deadline, nullptr);
747   // Init the LB call request payload.
748   grpc_grpclb_request* request =
749       grpc_grpclb_request_create(grpclb_policy()->server_name_);
750   grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
751   send_message_payload_ =
752       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
753   grpc_slice_unref_internal(request_payload_slice);
754   grpc_grpclb_request_destroy(request);
755   // Init other data associated with the LB call.
756   grpc_metadata_array_init(&lb_initial_metadata_recv_);
757   grpc_metadata_array_init(&lb_trailing_metadata_recv_);
758   GRPC_CLOSURE_INIT(&lb_on_initial_request_sent_, OnInitialRequestSentLocked,
759                     this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
760   GRPC_CLOSURE_INIT(&lb_on_balancer_message_received_,
761                     OnBalancerMessageReceivedLocked, this,
762                     grpc_combiner_scheduler(grpclb_policy()->combiner()));
763   GRPC_CLOSURE_INIT(&lb_on_balancer_status_received_,
764                     OnBalancerStatusReceivedLocked, this,
765                     grpc_combiner_scheduler(grpclb_policy()->combiner()));
766 }
767
768 GrpcLb::BalancerCallState::~BalancerCallState() {
769   GPR_ASSERT(lb_call_ != nullptr);
770   grpc_call_unref(lb_call_);
771   grpc_metadata_array_destroy(&lb_initial_metadata_recv_);
772   grpc_metadata_array_destroy(&lb_trailing_metadata_recv_);
773   grpc_byte_buffer_destroy(send_message_payload_);
774   grpc_byte_buffer_destroy(recv_message_payload_);
775   grpc_slice_unref_internal(lb_call_status_details_);
776 }
777
778 void GrpcLb::BalancerCallState::Orphan() {
779   GPR_ASSERT(lb_call_ != nullptr);
780   // If we are here because grpclb_policy wants to cancel the call,
781   // lb_on_balancer_status_received_ will complete the cancellation and clean
782   // up. Otherwise, we are here because grpclb_policy has to orphan a failed
783   // call, then the following cancellation will be a no-op.
784   grpc_call_cancel(lb_call_, nullptr);
785   if (client_load_report_timer_callback_pending_) {
786     grpc_timer_cancel(&client_load_report_timer_);
787   }
788   // Note that the initial ref is hold by lb_on_balancer_status_received_
789   // instead of the caller of this function. So the corresponding unref happens
790   // in lb_on_balancer_status_received_ instead of here.
791 }
792
793 void GrpcLb::BalancerCallState::StartQuery() {
794   GPR_ASSERT(lb_call_ != nullptr);
795   if (grpc_lb_glb_trace.enabled()) {
796     gpr_log(GPR_INFO, "[grpclb %p] lb_calld=%p: Starting LB call %p",
797             grpclb_policy_.get(), this, lb_call_);
798   }
799   // Create the ops.
800   grpc_call_error call_error;
801   grpc_op ops[3];
802   memset(ops, 0, sizeof(ops));
803   // Op: send initial metadata.
804   grpc_op* op = ops;
805   op->op = GRPC_OP_SEND_INITIAL_METADATA;
806   op->data.send_initial_metadata.count = 0;
807   op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
808               GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
809   op->reserved = nullptr;
810   op++;
811   // Op: send request message.
812   GPR_ASSERT(send_message_payload_ != nullptr);
813   op->op = GRPC_OP_SEND_MESSAGE;
814   op->data.send_message.send_message = send_message_payload_;
815   op->flags = 0;
816   op->reserved = nullptr;
817   op++;
818   // TODO(roth): We currently track this ref manually.  Once the
819   // ClosureRef API is ready, we should pass the RefCountedPtr<> along
820   // with the callback.
821   auto self = Ref(DEBUG_LOCATION, "on_initial_request_sent");
822   self.release();
823   call_error = grpc_call_start_batch_and_execute(
824       lb_call_, ops, (size_t)(op - ops), &lb_on_initial_request_sent_);
825   GPR_ASSERT(GRPC_CALL_OK == call_error);
826   // Op: recv initial metadata.
827   op = ops;
828   op->op = GRPC_OP_RECV_INITIAL_METADATA;
829   op->data.recv_initial_metadata.recv_initial_metadata =
830       &lb_initial_metadata_recv_;
831   op->flags = 0;
832   op->reserved = nullptr;
833   op++;
834   // Op: recv response.
835   op->op = GRPC_OP_RECV_MESSAGE;
836   op->data.recv_message.recv_message = &recv_message_payload_;
837   op->flags = 0;
838   op->reserved = nullptr;
839   op++;
840   // TODO(roth): We currently track this ref manually.  Once the
841   // ClosureRef API is ready, we should pass the RefCountedPtr<> along
842   // with the callback.
843   self = Ref(DEBUG_LOCATION, "on_message_received");
844   self.release();
845   call_error = grpc_call_start_batch_and_execute(
846       lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_message_received_);
847   GPR_ASSERT(GRPC_CALL_OK == call_error);
848   // Op: recv server status.
849   op = ops;
850   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
851   op->data.recv_status_on_client.trailing_metadata =
852       &lb_trailing_metadata_recv_;
853   op->data.recv_status_on_client.status = &lb_call_status_;
854   op->data.recv_status_on_client.status_details = &lb_call_status_details_;
855   op->flags = 0;
856   op->reserved = nullptr;
857   op++;
858   // This callback signals the end of the LB call, so it relies on the initial
859   // ref instead of a new ref. When it's invoked, it's the initial ref that is
860   // unreffed.
861   call_error = grpc_call_start_batch_and_execute(
862       lb_call_, ops, (size_t)(op - ops), &lb_on_balancer_status_received_);
863   GPR_ASSERT(GRPC_CALL_OK == call_error);
864 }
865
866 void GrpcLb::BalancerCallState::ScheduleNextClientLoadReportLocked() {
867   const grpc_millis next_client_load_report_time =
868       ExecCtx::Get()->Now() + client_stats_report_interval_;
869   GRPC_CLOSURE_INIT(&client_load_report_closure_,
870                     MaybeSendClientLoadReportLocked, this,
871                     grpc_combiner_scheduler(grpclb_policy()->combiner()));
872   grpc_timer_init(&client_load_report_timer_, next_client_load_report_time,
873                   &client_load_report_closure_);
874   client_load_report_timer_callback_pending_ = true;
875 }
876
877 void GrpcLb::BalancerCallState::MaybeSendClientLoadReportLocked(
878     void* arg, grpc_error* error) {
879   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
880   GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
881   lb_calld->client_load_report_timer_callback_pending_ = false;
882   if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
883     lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
884     return;
885   }
886   // If we've already sent the initial request, then we can go ahead and send
887   // the load report. Otherwise, we need to wait until the initial request has
888   // been sent to send this (see OnInitialRequestSentLocked()).
889   if (lb_calld->send_message_payload_ == nullptr) {
890     lb_calld->SendClientLoadReportLocked();
891   } else {
892     lb_calld->client_load_report_is_due_ = true;
893   }
894 }
895
896 bool GrpcLb::BalancerCallState::LoadReportCountersAreZero(
897     grpc_grpclb_request* request) {
898   GrpcLbClientStats::DroppedCallCounts* drop_entries =
899       static_cast<GrpcLbClientStats::DroppedCallCounts*>(
900           request->client_stats.calls_finished_with_drop.arg);
901   return request->client_stats.num_calls_started == 0 &&
902          request->client_stats.num_calls_finished == 0 &&
903          request->client_stats.num_calls_finished_with_client_failed_to_send ==
904              0 &&
905          request->client_stats.num_calls_finished_known_received == 0 &&
906          (drop_entries == nullptr || drop_entries->size() == 0);
907 }
908
909 void GrpcLb::BalancerCallState::SendClientLoadReportLocked() {
910   // Construct message payload.
911   GPR_ASSERT(send_message_payload_ == nullptr);
912   grpc_grpclb_request* request =
913       grpc_grpclb_load_report_request_create_locked(client_stats_.get());
914   // Skip client load report if the counters were all zero in the last
915   // report and they are still zero in this one.
916   if (LoadReportCountersAreZero(request)) {
917     if (last_client_load_report_counters_were_zero_) {
918       grpc_grpclb_request_destroy(request);
919       ScheduleNextClientLoadReportLocked();
920       return;
921     }
922     last_client_load_report_counters_were_zero_ = true;
923   } else {
924     last_client_load_report_counters_were_zero_ = false;
925   }
926   grpc_slice request_payload_slice = grpc_grpclb_request_encode(request);
927   send_message_payload_ =
928       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
929   grpc_slice_unref_internal(request_payload_slice);
930   grpc_grpclb_request_destroy(request);
931   // Send the report.
932   grpc_op op;
933   memset(&op, 0, sizeof(op));
934   op.op = GRPC_OP_SEND_MESSAGE;
935   op.data.send_message.send_message = send_message_payload_;
936   GRPC_CLOSURE_INIT(&client_load_report_closure_, ClientLoadReportDoneLocked,
937                     this, grpc_combiner_scheduler(grpclb_policy()->combiner()));
938   grpc_call_error call_error = grpc_call_start_batch_and_execute(
939       lb_call_, &op, 1, &client_load_report_closure_);
940   if (GPR_UNLIKELY(call_error != GRPC_CALL_OK)) {
941     gpr_log(GPR_ERROR,
942             "[grpclb %p] lb_calld=%p call_error=%d sending client load report",
943             grpclb_policy_.get(), this, call_error);
944     GPR_ASSERT(GRPC_CALL_OK == call_error);
945   }
946 }
947
948 void GrpcLb::BalancerCallState::ClientLoadReportDoneLocked(void* arg,
949                                                            grpc_error* error) {
950   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
951   GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
952   grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
953   lb_calld->send_message_payload_ = nullptr;
954   if (error != GRPC_ERROR_NONE || lb_calld != grpclb_policy->lb_calld_.get()) {
955     lb_calld->Unref(DEBUG_LOCATION, "client_load_report");
956     return;
957   }
958   lb_calld->ScheduleNextClientLoadReportLocked();
959 }
960
961 void GrpcLb::BalancerCallState::OnInitialRequestSentLocked(void* arg,
962                                                            grpc_error* error) {
963   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
964   grpc_byte_buffer_destroy(lb_calld->send_message_payload_);
965   lb_calld->send_message_payload_ = nullptr;
966   // If we attempted to send a client load report before the initial request was
967   // sent (and this lb_calld is still in use), send the load report now.
968   if (lb_calld->client_load_report_is_due_ &&
969       lb_calld == lb_calld->grpclb_policy()->lb_calld_.get()) {
970     lb_calld->SendClientLoadReportLocked();
971     lb_calld->client_load_report_is_due_ = false;
972   }
973   lb_calld->Unref(DEBUG_LOCATION, "on_initial_request_sent");
974 }
975
976 void GrpcLb::BalancerCallState::OnBalancerMessageReceivedLocked(
977     void* arg, grpc_error* error) {
978   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
979   GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
980   // Null payload means the LB call was cancelled.
981   if (lb_calld != grpclb_policy->lb_calld_.get() ||
982       lb_calld->recv_message_payload_ == nullptr) {
983     lb_calld->Unref(DEBUG_LOCATION, "on_message_received");
984     return;
985   }
986   grpc_byte_buffer_reader bbr;
987   grpc_byte_buffer_reader_init(&bbr, lb_calld->recv_message_payload_);
988   grpc_slice response_slice = grpc_byte_buffer_reader_readall(&bbr);
989   grpc_byte_buffer_reader_destroy(&bbr);
990   grpc_byte_buffer_destroy(lb_calld->recv_message_payload_);
991   lb_calld->recv_message_payload_ = nullptr;
992   grpc_grpclb_initial_response* initial_response;
993   grpc_grpclb_serverlist* serverlist;
994   if (!lb_calld->seen_initial_response_ &&
995       (initial_response = grpc_grpclb_initial_response_parse(response_slice)) !=
996           nullptr) {
997     // Have NOT seen initial response, look for initial response.
998     if (initial_response->has_client_stats_report_interval) {
999       lb_calld->client_stats_report_interval_ = GPR_MAX(
1000           GPR_MS_PER_SEC, grpc_grpclb_duration_to_millis(
1001                               &initial_response->client_stats_report_interval));
1002       if (grpc_lb_glb_trace.enabled()) {
1003         gpr_log(GPR_INFO,
1004                 "[grpclb %p] lb_calld=%p: Received initial LB response "
1005                 "message; client load reporting interval = %" PRId64
1006                 " milliseconds",
1007                 grpclb_policy, lb_calld,
1008                 lb_calld->client_stats_report_interval_);
1009       }
1010     } else if (grpc_lb_glb_trace.enabled()) {
1011       gpr_log(GPR_INFO,
1012               "[grpclb %p] lb_calld=%p: Received initial LB response message; "
1013               "client load reporting NOT enabled",
1014               grpclb_policy, lb_calld);
1015     }
1016     grpc_grpclb_initial_response_destroy(initial_response);
1017     lb_calld->seen_initial_response_ = true;
1018   } else if ((serverlist = grpc_grpclb_response_parse_serverlist(
1019                   response_slice)) != nullptr) {
1020     // Have seen initial response, look for serverlist.
1021     GPR_ASSERT(lb_calld->lb_call_ != nullptr);
1022     auto serverlist_wrapper = MakeRefCounted<Serverlist>(serverlist);
1023     if (grpc_lb_glb_trace.enabled()) {
1024       UniquePtr<char> serverlist_text = serverlist_wrapper->AsText();
1025       gpr_log(GPR_INFO,
1026               "[grpclb %p] lb_calld=%p: Serverlist with %" PRIuPTR
1027               " servers received:\n%s",
1028               grpclb_policy, lb_calld, serverlist->num_servers,
1029               serverlist_text.get());
1030     }
1031     lb_calld->seen_serverlist_ = true;
1032     // Start sending client load report only after we start using the
1033     // serverlist returned from the current LB call.
1034     if (lb_calld->client_stats_report_interval_ > 0 &&
1035         lb_calld->client_stats_ == nullptr) {
1036       lb_calld->client_stats_ = MakeRefCounted<GrpcLbClientStats>();
1037       // Ref held by callback.
1038       lb_calld->Ref(DEBUG_LOCATION, "client_load_report").release();
1039       lb_calld->ScheduleNextClientLoadReportLocked();
1040     }
1041     // Check if the serverlist differs from the previous one.
1042     if (grpclb_policy->serverlist_ != nullptr &&
1043         *grpclb_policy->serverlist_ == *serverlist_wrapper) {
1044       if (grpc_lb_glb_trace.enabled()) {
1045         gpr_log(GPR_INFO,
1046                 "[grpclb %p] lb_calld=%p: Incoming server list identical to "
1047                 "current, ignoring.",
1048                 grpclb_policy, lb_calld);
1049       }
1050     } else {  // New serverlist.
1051       // Dispose of the fallback.
1052       // TODO(roth): Ideally, we should stay in fallback mode until we
1053       // know that we can reach at least one of the backends in the new
1054       // serverlist.  Unfortunately, we can't do that, since we need to
1055       // send the new addresses to the child policy in order to determine
1056       // if they are reachable, and if we don't exit fallback mode now,
1057       // CreateOrUpdateChildPolicyLocked() will use the fallback
1058       // addresses instead of the addresses from the new serverlist.
1059       // However, if we can't reach any of the servers in the new
1060       // serverlist, then the child policy will never switch away from
1061       // the fallback addresses, but the grpclb policy will still think
1062       // that we're not in fallback mode, which means that we won't send
1063       // updates to the child policy when the fallback addresses are
1064       // updated by the resolver.  This is sub-optimal, but the only way
1065       // to fix it is to maintain a completely separate child policy for
1066       // fallback mode, and that's more work than we want to put into
1067       // the grpclb implementation at this point, since we're deprecating
1068       // it in favor of the xds policy.  We will implement this the
1069       // right way in the xds policy instead.
1070       if (grpclb_policy->fallback_mode_) {
1071         gpr_log(GPR_INFO,
1072                 "[grpclb %p] Received response from balancer; exiting "
1073                 "fallback mode",
1074                 grpclb_policy);
1075         grpclb_policy->fallback_mode_ = false;
1076       }
1077       if (grpclb_policy->fallback_at_startup_checks_pending_) {
1078         grpclb_policy->fallback_at_startup_checks_pending_ = false;
1079         grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
1080         grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
1081       }
1082       // Update the serverlist in the GrpcLb instance. This serverlist
1083       // instance will be destroyed either upon the next update or when the
1084       // GrpcLb instance is destroyed.
1085       grpclb_policy->serverlist_ = std::move(serverlist_wrapper);
1086       grpclb_policy->CreateOrUpdateChildPolicyLocked();
1087     }
1088   } else {
1089     // No valid initial response or serverlist found.
1090     char* response_slice_str =
1091         grpc_dump_slice(response_slice, GPR_DUMP_ASCII | GPR_DUMP_HEX);
1092     gpr_log(GPR_ERROR,
1093             "[grpclb %p] lb_calld=%p: Invalid LB response received: '%s'. "
1094             "Ignoring.",
1095             grpclb_policy, lb_calld, response_slice_str);
1096     gpr_free(response_slice_str);
1097   }
1098   grpc_slice_unref_internal(response_slice);
1099   if (!grpclb_policy->shutting_down_) {
1100     // Keep listening for serverlist updates.
1101     grpc_op op;
1102     memset(&op, 0, sizeof(op));
1103     op.op = GRPC_OP_RECV_MESSAGE;
1104     op.data.recv_message.recv_message = &lb_calld->recv_message_payload_;
1105     op.flags = 0;
1106     op.reserved = nullptr;
1107     // Reuse the "OnBalancerMessageReceivedLocked" ref taken in StartQuery().
1108     const grpc_call_error call_error = grpc_call_start_batch_and_execute(
1109         lb_calld->lb_call_, &op, 1,
1110         &lb_calld->lb_on_balancer_message_received_);
1111     GPR_ASSERT(GRPC_CALL_OK == call_error);
1112   } else {
1113     lb_calld->Unref(DEBUG_LOCATION, "on_message_received+grpclb_shutdown");
1114   }
1115 }
1116
1117 void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
1118     void* arg, grpc_error* error) {
1119   BalancerCallState* lb_calld = static_cast<BalancerCallState*>(arg);
1120   GrpcLb* grpclb_policy = lb_calld->grpclb_policy();
1121   GPR_ASSERT(lb_calld->lb_call_ != nullptr);
1122   if (grpc_lb_glb_trace.enabled()) {
1123     char* status_details =
1124         grpc_slice_to_c_string(lb_calld->lb_call_status_details_);
1125     gpr_log(GPR_INFO,
1126             "[grpclb %p] lb_calld=%p: Status from LB server received. "
1127             "Status = %d, details = '%s', (lb_call: %p), error '%s'",
1128             grpclb_policy, lb_calld, lb_calld->lb_call_status_, status_details,
1129             lb_calld->lb_call_, grpc_error_string(error));
1130     gpr_free(status_details);
1131   }
1132   // If this lb_calld is still in use, this call ended because of a failure so
1133   // we want to retry connecting. Otherwise, we have deliberately ended this
1134   // call and no further action is required.
1135   if (lb_calld == grpclb_policy->lb_calld_.get()) {
1136     // If we did not receive a serverlist and the fallback-at-startup checks
1137     // are pending, go into fallback mode immediately.  This short-circuits
1138     // the timeout for the fallback-at-startup case.
1139     if (!lb_calld->seen_serverlist_ &&
1140         grpclb_policy->fallback_at_startup_checks_pending_) {
1141       gpr_log(GPR_INFO,
1142               "[grpclb %p] balancer call finished without receiving "
1143               "serverlist; entering fallback mode",
1144               grpclb_policy);
1145       grpclb_policy->fallback_at_startup_checks_pending_ = false;
1146       grpc_timer_cancel(&grpclb_policy->lb_fallback_timer_);
1147       grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
1148       grpclb_policy->fallback_mode_ = true;
1149       grpclb_policy->CreateOrUpdateChildPolicyLocked();
1150     } else {
1151       // This handles the fallback-after-startup case.
1152       grpclb_policy->MaybeEnterFallbackModeAfterStartup();
1153     }
1154     grpclb_policy->lb_calld_.reset();
1155     GPR_ASSERT(!grpclb_policy->shutting_down_);
1156     grpclb_policy->channel_control_helper()->RequestReresolution();
1157     if (lb_calld->seen_initial_response_) {
1158       // If we lose connection to the LB server, reset the backoff and restart
1159       // the LB call immediately.
1160       grpclb_policy->lb_call_backoff_.Reset();
1161       grpclb_policy->StartBalancerCallLocked();
1162     } else {
1163       // If this LB call fails establishing any connection to the LB server,
1164       // retry later.
1165       grpclb_policy->StartBalancerCallRetryTimerLocked();
1166     }
1167   }
1168   lb_calld->Unref(DEBUG_LOCATION, "lb_call_ended");
1169 }
1170
1171 //
1172 // helper code for creating balancer channel
1173 //
1174
1175 ServerAddressList ExtractBalancerAddresses(const ServerAddressList& addresses) {
1176   ServerAddressList balancer_addresses;
1177   for (size_t i = 0; i < addresses.size(); ++i) {
1178     if (addresses[i].IsBalancer()) {
1179       // Strip out the is_balancer channel arg, since we don't want to
1180       // recursively use the grpclb policy in the channel used to talk to
1181       // the balancers.  Note that we do NOT strip out the balancer_name
1182       // channel arg, since we need that to set the authority correctly
1183       // to talk to the balancers.
1184       static const char* args_to_remove[] = {
1185           GRPC_ARG_ADDRESS_IS_BALANCER,
1186       };
1187       balancer_addresses.emplace_back(
1188           addresses[i].address(),
1189           grpc_channel_args_copy_and_remove(addresses[i].args(), args_to_remove,
1190                                             GPR_ARRAY_SIZE(args_to_remove)));
1191     }
1192   }
1193   return balancer_addresses;
1194 }
1195
1196 /* Returns the channel args for the LB channel, used to create a bidirectional
1197  * stream for the reception of load balancing updates.
1198  *
1199  * Inputs:
1200  *   - \a addresses: corresponding to the balancers.
1201  *   - \a response_generator: in order to propagate updates from the resolver
1202  *   above the grpclb policy.
1203  *   - \a args: other args inherited from the grpclb policy. */
1204 grpc_channel_args* BuildBalancerChannelArgs(
1205     const ServerAddressList& addresses,
1206     FakeResolverResponseGenerator* response_generator,
1207     const grpc_channel_args* args) {
1208   // Channel args to remove.
1209   static const char* args_to_remove[] = {
1210       // LB policy name, since we want to use the default (pick_first) in
1211       // the LB channel.
1212       GRPC_ARG_LB_POLICY_NAME,
1213       // Strip out the service config, since we don't want the LB policy
1214       // config specified for the parent channel to affect the LB channel.
1215       GRPC_ARG_SERVICE_CONFIG,
1216       // The channel arg for the server URI, since that will be different for
1217       // the LB channel than for the parent channel.  The client channel
1218       // factory will re-add this arg with the right value.
1219       GRPC_ARG_SERVER_URI,
1220       // The fake resolver response generator, because we are replacing it
1221       // with the one from the grpclb policy, used to propagate updates to
1222       // the LB channel.
1223       GRPC_ARG_FAKE_RESOLVER_RESPONSE_GENERATOR,
1224       // The LB channel should use the authority indicated by the target
1225       // authority table (see \a grpc_lb_policy_grpclb_modify_lb_channel_args),
1226       // as opposed to the authority from the parent channel.
1227       GRPC_ARG_DEFAULT_AUTHORITY,
1228       // Just as for \a GRPC_ARG_DEFAULT_AUTHORITY, the LB channel should be
1229       // treated as a stand-alone channel and not inherit this argument from the
1230       // args of the parent channel.
1231       GRPC_SSL_TARGET_NAME_OVERRIDE_ARG,
1232   };
1233   // Channel args to add.
1234   const grpc_arg args_to_add[] = {
1235       // The fake resolver response generator, which we use to inject
1236       // address updates into the LB channel.
1237       grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
1238           response_generator),
1239       // A channel arg indicating the target is a grpclb load balancer.
1240       grpc_channel_arg_integer_create(
1241           const_cast<char*>(GRPC_ARG_ADDRESS_IS_GRPCLB_LOAD_BALANCER), 1),
1242       // A channel arg indicating this is an internal channels, aka it is
1243       // owned by components in Core, not by the user application.
1244       grpc_channel_arg_integer_create(
1245           const_cast<char*>(GRPC_ARG_CHANNELZ_CHANNEL_IS_INTERNAL_CHANNEL), 1),
1246   };
1247   // Construct channel args.
1248   grpc_channel_args* new_args = grpc_channel_args_copy_and_add_and_remove(
1249       args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), args_to_add,
1250       GPR_ARRAY_SIZE(args_to_add));
1251   // Make any necessary modifications for security.
1252   return grpc_lb_policy_grpclb_modify_lb_channel_args(addresses, new_args);
1253 }
1254
1255 //
1256 // ctor and dtor
1257 //
1258
1259 GrpcLb::GrpcLb(Args args)
1260     : LoadBalancingPolicy(std::move(args)),
1261       response_generator_(MakeRefCounted<FakeResolverResponseGenerator>()),
1262       lb_call_backoff_(
1263           BackOff::Options()
1264               .set_initial_backoff(GRPC_GRPCLB_INITIAL_CONNECT_BACKOFF_SECONDS *
1265                                    1000)
1266               .set_multiplier(GRPC_GRPCLB_RECONNECT_BACKOFF_MULTIPLIER)
1267               .set_jitter(GRPC_GRPCLB_RECONNECT_JITTER)
1268               .set_max_backoff(GRPC_GRPCLB_RECONNECT_MAX_BACKOFF_SECONDS *
1269                                1000)) {
1270   // Initialization.
1271   GRPC_CLOSURE_INIT(&lb_on_fallback_, &GrpcLb::OnFallbackTimerLocked, this,
1272                     grpc_combiner_scheduler(combiner()));
1273   GRPC_CLOSURE_INIT(&lb_channel_on_connectivity_changed_,
1274                     &GrpcLb::OnBalancerChannelConnectivityChangedLocked, this,
1275                     grpc_combiner_scheduler(args.combiner));
1276   gpr_mu_init(&child_policy_mu_);
1277   // Record server name.
1278   const grpc_arg* arg = grpc_channel_args_find(args.args, GRPC_ARG_SERVER_URI);
1279   const char* server_uri = grpc_channel_arg_get_string(arg);
1280   GPR_ASSERT(server_uri != nullptr);
1281   grpc_uri* uri = grpc_uri_parse(server_uri, true);
1282   GPR_ASSERT(uri->path[0] != '\0');
1283   server_name_ = gpr_strdup(uri->path[0] == '/' ? uri->path + 1 : uri->path);
1284   if (grpc_lb_glb_trace.enabled()) {
1285     gpr_log(GPR_INFO,
1286             "[grpclb %p] Will use '%s' as the server name for LB request.",
1287             this, server_name_);
1288   }
1289   grpc_uri_destroy(uri);
1290   // Record LB call timeout.
1291   arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_CALL_TIMEOUT_MS);
1292   lb_call_timeout_ms_ = grpc_channel_arg_get_integer(arg, {0, 0, INT_MAX});
1293   // Record fallback-at-startup timeout.
1294   arg = grpc_channel_args_find(args.args, GRPC_ARG_GRPCLB_FALLBACK_TIMEOUT_MS);
1295   fallback_at_startup_timeout_ = grpc_channel_arg_get_integer(
1296       arg, {GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS, 0, INT_MAX});
1297 }
1298
1299 GrpcLb::~GrpcLb() {
1300   gpr_free((void*)server_name_);
1301   grpc_channel_args_destroy(args_);
1302   gpr_mu_destroy(&child_policy_mu_);
1303 }
1304
1305 void GrpcLb::ShutdownLocked() {
1306   shutting_down_ = true;
1307   lb_calld_.reset();
1308   if (retry_timer_callback_pending_) {
1309     grpc_timer_cancel(&lb_call_retry_timer_);
1310   }
1311   if (fallback_at_startup_checks_pending_) {
1312     grpc_timer_cancel(&lb_fallback_timer_);
1313     CancelBalancerChannelConnectivityWatchLocked();
1314   }
1315   if (child_policy_ != nullptr) {
1316     grpc_pollset_set_del_pollset_set(child_policy_->interested_parties(),
1317                                      interested_parties());
1318   }
1319   if (pending_child_policy_ != nullptr) {
1320     grpc_pollset_set_del_pollset_set(
1321         pending_child_policy_->interested_parties(), interested_parties());
1322   }
1323   {
1324     MutexLock lock(&child_policy_mu_);
1325     child_policy_.reset();
1326     pending_child_policy_.reset();
1327   }
1328   // We destroy the LB channel here instead of in our destructor because
1329   // destroying the channel triggers a last callback to
1330   // OnBalancerChannelConnectivityChangedLocked(), and we need to be
1331   // alive when that callback is invoked.
1332   if (lb_channel_ != nullptr) {
1333     grpc_channel_destroy(lb_channel_);
1334     lb_channel_ = nullptr;
1335     gpr_atm_no_barrier_store(&lb_channel_uuid_, 0);
1336   }
1337 }
1338
1339 //
1340 // public methods
1341 //
1342
1343 void GrpcLb::ResetBackoffLocked() {
1344   if (lb_channel_ != nullptr) {
1345     grpc_channel_reset_connect_backoff(lb_channel_);
1346   }
1347   if (child_policy_ != nullptr) {
1348     child_policy_->ResetBackoffLocked();
1349   }
1350   if (pending_child_policy_ != nullptr) {
1351     pending_child_policy_->ResetBackoffLocked();
1352   }
1353 }
1354
1355 void GrpcLb::FillChildRefsForChannelz(
1356     channelz::ChildRefsList* child_subchannels,
1357     channelz::ChildRefsList* child_channels) {
1358   {
1359     // Delegate to the child policy to fill the children subchannels.
1360     // This must be done holding child_policy_mu_, since this method
1361     // does not run in the combiner.
1362     MutexLock lock(&child_policy_mu_);
1363     if (child_policy_ != nullptr) {
1364       child_policy_->FillChildRefsForChannelz(child_subchannels,
1365                                               child_channels);
1366     }
1367     if (pending_child_policy_ != nullptr) {
1368       pending_child_policy_->FillChildRefsForChannelz(child_subchannels,
1369                                                       child_channels);
1370     }
1371   }
1372   gpr_atm uuid = gpr_atm_no_barrier_load(&lb_channel_uuid_);
1373   if (uuid != 0) {
1374     child_channels->push_back(uuid);
1375   }
1376 }
1377
1378 void GrpcLb::UpdateLocked(UpdateArgs args) {
1379   const bool is_initial_update = lb_channel_ == nullptr;
1380   ParseLbConfig(args.config.get());
1381   ProcessAddressesAndChannelArgsLocked(args.addresses, *args.args);
1382   // Update the existing child policy.
1383   if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked();
1384   // If this is the initial update, start the fallback-at-startup checks
1385   // and the balancer call.
1386   if (is_initial_update) {
1387     fallback_at_startup_checks_pending_ = true;
1388     // Start timer.
1389     grpc_millis deadline = ExecCtx::Get()->Now() + fallback_at_startup_timeout_;
1390     Ref(DEBUG_LOCATION, "on_fallback_timer").release();  // Ref for callback
1391     grpc_timer_init(&lb_fallback_timer_, deadline, &lb_on_fallback_);
1392     // Start watching the channel's connectivity state.  If the channel
1393     // goes into state TRANSIENT_FAILURE before the timer fires, we go into
1394     // fallback mode even if the fallback timeout has not elapsed.
1395     grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
1396         grpc_channel_get_channel_stack(lb_channel_));
1397     GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1398     // Ref held by callback.
1399     Ref(DEBUG_LOCATION, "watch_lb_channel_connectivity").release();
1400     grpc_client_channel_watch_connectivity_state(
1401         client_channel_elem,
1402         grpc_polling_entity_create_from_pollset_set(interested_parties()),
1403         &lb_channel_connectivity_, &lb_channel_on_connectivity_changed_,
1404         nullptr);
1405     // Start balancer call.
1406     StartBalancerCallLocked();
1407   }
1408 }
1409
1410 //
1411 // helpers for UpdateLocked()
1412 //
1413
1414 // Returns the backend addresses extracted from the given addresses.
1415 ServerAddressList ExtractBackendAddresses(const ServerAddressList& addresses) {
1416   void* lb_token = (void*)GRPC_MDELEM_LB_TOKEN_EMPTY.payload;
1417   grpc_arg arg = grpc_channel_arg_pointer_create(
1418       const_cast<char*>(GRPC_ARG_GRPCLB_ADDRESS_LB_TOKEN), lb_token,
1419       &lb_token_arg_vtable);
1420   ServerAddressList backend_addresses;
1421   for (size_t i = 0; i < addresses.size(); ++i) {
1422     if (!addresses[i].IsBalancer()) {
1423       backend_addresses.emplace_back(
1424           addresses[i].address(),
1425           grpc_channel_args_copy_and_add(addresses[i].args(), &arg, 1));
1426     }
1427   }
1428   return backend_addresses;
1429 }
1430
1431 void GrpcLb::ProcessAddressesAndChannelArgsLocked(
1432     const ServerAddressList& addresses, const grpc_channel_args& args) {
1433   // Update fallback address list.
1434   fallback_backend_addresses_ = ExtractBackendAddresses(addresses);
1435   // Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
1436   // since we use this to trigger the client_load_reporting filter.
1437   static const char* args_to_remove[] = {GRPC_ARG_LB_POLICY_NAME};
1438   grpc_arg new_arg = grpc_channel_arg_string_create(
1439       (char*)GRPC_ARG_LB_POLICY_NAME, (char*)"grpclb");
1440   grpc_channel_args_destroy(args_);
1441   args_ = grpc_channel_args_copy_and_add_and_remove(
1442       &args, args_to_remove, GPR_ARRAY_SIZE(args_to_remove), &new_arg, 1);
1443   // Construct args for balancer channel.
1444   ServerAddressList balancer_addresses = ExtractBalancerAddresses(addresses);
1445   grpc_channel_args* lb_channel_args = BuildBalancerChannelArgs(
1446       balancer_addresses, response_generator_.get(), &args);
1447   // Create balancer channel if needed.
1448   if (lb_channel_ == nullptr) {
1449     char* uri_str;
1450     gpr_asprintf(&uri_str, "fake:///%s", server_name_);
1451     lb_channel_ =
1452         channel_control_helper()->CreateChannel(uri_str, *lb_channel_args);
1453     GPR_ASSERT(lb_channel_ != nullptr);
1454     grpc_core::channelz::ChannelNode* channel_node =
1455         grpc_channel_get_channelz_node(lb_channel_);
1456     if (channel_node != nullptr) {
1457       gpr_atm_no_barrier_store(&lb_channel_uuid_, channel_node->uuid());
1458     }
1459     gpr_free(uri_str);
1460   }
1461   // Propagate updates to the LB channel (pick_first) through the fake
1462   // resolver.
1463   Resolver::Result result;
1464   result.addresses = std::move(balancer_addresses);
1465   result.args = lb_channel_args;
1466   response_generator_->SetResponse(std::move(result));
1467 }
1468
1469 void GrpcLb::ParseLbConfig(Config* grpclb_config) {
1470   const grpc_json* child_policy = nullptr;
1471   if (grpclb_config != nullptr) {
1472     const grpc_json* grpclb_config_json = grpclb_config->config();
1473     for (const grpc_json* field = grpclb_config_json; field != nullptr;
1474          field = field->next) {
1475       if (field->key == nullptr) return;
1476       if (strcmp(field->key, "childPolicy") == 0) {
1477         if (child_policy != nullptr) return;  // Duplicate.
1478         child_policy = ParseLoadBalancingConfig(field);
1479       }
1480     }
1481   }
1482   if (child_policy != nullptr) {
1483     child_policy_config_ =
1484         MakeRefCounted<Config>(child_policy, grpclb_config->service_config());
1485   } else {
1486     child_policy_config_.reset();
1487   }
1488 }
1489
1490 void GrpcLb::OnBalancerChannelConnectivityChangedLocked(void* arg,
1491                                                         grpc_error* error) {
1492   GrpcLb* self = static_cast<GrpcLb*>(arg);
1493   if (!self->shutting_down_ && self->fallback_at_startup_checks_pending_) {
1494     if (self->lb_channel_connectivity_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
1495       // Not in TRANSIENT_FAILURE.  Renew connectivity watch.
1496       grpc_channel_element* client_channel_elem =
1497           grpc_channel_stack_last_element(
1498               grpc_channel_get_channel_stack(self->lb_channel_));
1499       GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1500       grpc_client_channel_watch_connectivity_state(
1501           client_channel_elem,
1502           grpc_polling_entity_create_from_pollset_set(
1503               self->interested_parties()),
1504           &self->lb_channel_connectivity_,
1505           &self->lb_channel_on_connectivity_changed_, nullptr);
1506       return;  // Early out so we don't drop the ref below.
1507     }
1508     // In TRANSIENT_FAILURE.  Cancel the fallback timer and go into
1509     // fallback mode immediately.
1510     gpr_log(GPR_INFO,
1511             "[grpclb %p] balancer channel in state TRANSIENT_FAILURE; "
1512             "entering fallback mode",
1513             self);
1514     self->fallback_at_startup_checks_pending_ = false;
1515     grpc_timer_cancel(&self->lb_fallback_timer_);
1516     self->fallback_mode_ = true;
1517     self->CreateOrUpdateChildPolicyLocked();
1518   }
1519   // Done watching connectivity state, so drop ref.
1520   self->Unref(DEBUG_LOCATION, "watch_lb_channel_connectivity");
1521 }
1522
1523 void GrpcLb::CancelBalancerChannelConnectivityWatchLocked() {
1524   grpc_channel_element* client_channel_elem = grpc_channel_stack_last_element(
1525       grpc_channel_get_channel_stack(lb_channel_));
1526   GPR_ASSERT(client_channel_elem->filter == &grpc_client_channel_filter);
1527   grpc_client_channel_watch_connectivity_state(
1528       client_channel_elem,
1529       grpc_polling_entity_create_from_pollset_set(interested_parties()),
1530       nullptr, &lb_channel_on_connectivity_changed_, nullptr);
1531 }
1532
1533 //
1534 // code for balancer channel and call
1535 //
1536
1537 void GrpcLb::StartBalancerCallLocked() {
1538   GPR_ASSERT(lb_channel_ != nullptr);
1539   if (shutting_down_) return;
1540   // Init the LB call data.
1541   GPR_ASSERT(lb_calld_ == nullptr);
1542   lb_calld_ = MakeOrphanable<BalancerCallState>(Ref());
1543   if (grpc_lb_glb_trace.enabled()) {
1544     gpr_log(GPR_INFO,
1545             "[grpclb %p] Query for backends (lb_channel: %p, lb_calld: %p)",
1546             this, lb_channel_, lb_calld_.get());
1547   }
1548   lb_calld_->StartQuery();
1549 }
1550
1551 void GrpcLb::StartBalancerCallRetryTimerLocked() {
1552   grpc_millis next_try = lb_call_backoff_.NextAttemptTime();
1553   if (grpc_lb_glb_trace.enabled()) {
1554     gpr_log(GPR_INFO, "[grpclb %p] Connection to LB server lost...", this);
1555     grpc_millis timeout = next_try - ExecCtx::Get()->Now();
1556     if (timeout > 0) {
1557       gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active in %" PRId64 "ms.",
1558               this, timeout);
1559     } else {
1560       gpr_log(GPR_INFO, "[grpclb %p] ... retry_timer_active immediately.",
1561               this);
1562     }
1563   }
1564   // TODO(roth): We currently track this ref manually.  Once the
1565   // ClosureRef API is ready, we should pass the RefCountedPtr<> along
1566   // with the callback.
1567   auto self = Ref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
1568   self.release();
1569   GRPC_CLOSURE_INIT(&lb_on_call_retry_, &GrpcLb::OnBalancerCallRetryTimerLocked,
1570                     this, grpc_combiner_scheduler(combiner()));
1571   retry_timer_callback_pending_ = true;
1572   grpc_timer_init(&lb_call_retry_timer_, next_try, &lb_on_call_retry_);
1573 }
1574
1575 void GrpcLb::OnBalancerCallRetryTimerLocked(void* arg, grpc_error* error) {
1576   GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1577   grpclb_policy->retry_timer_callback_pending_ = false;
1578   if (!grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE &&
1579       grpclb_policy->lb_calld_ == nullptr) {
1580     if (grpc_lb_glb_trace.enabled()) {
1581       gpr_log(GPR_INFO, "[grpclb %p] Restarting call to LB server",
1582               grpclb_policy);
1583     }
1584     grpclb_policy->StartBalancerCallLocked();
1585   }
1586   grpclb_policy->Unref(DEBUG_LOCATION, "on_balancer_call_retry_timer");
1587 }
1588
1589 //
1590 // code for handling fallback mode
1591 //
1592
1593 void GrpcLb::MaybeEnterFallbackModeAfterStartup() {
1594   // Enter fallback mode if all of the following are true:
1595   // - We are not currently in fallback mode.
1596   // - We are not currently waiting for the initial fallback timeout.
1597   // - We are not currently in contact with the balancer.
1598   // - The child policy is not in state READY.
1599   if (!fallback_mode_ && !fallback_at_startup_checks_pending_ &&
1600       (lb_calld_ == nullptr || !lb_calld_->seen_serverlist()) &&
1601       !child_policy_ready_) {
1602     gpr_log(GPR_INFO,
1603             "[grpclb %p] lost contact with balancer and backends from "
1604             "most recent serverlist; entering fallback mode",
1605             this);
1606     fallback_mode_ = true;
1607     CreateOrUpdateChildPolicyLocked();
1608   }
1609 }
1610
1611 void GrpcLb::OnFallbackTimerLocked(void* arg, grpc_error* error) {
1612   GrpcLb* grpclb_policy = static_cast<GrpcLb*>(arg);
1613   // If we receive a serverlist after the timer fires but before this callback
1614   // actually runs, don't fall back.
1615   if (grpclb_policy->fallback_at_startup_checks_pending_ &&
1616       !grpclb_policy->shutting_down_ && error == GRPC_ERROR_NONE) {
1617     gpr_log(GPR_INFO,
1618             "[grpclb %p] No response from balancer after fallback timeout; "
1619             "entering fallback mode",
1620             grpclb_policy);
1621     grpclb_policy->fallback_at_startup_checks_pending_ = false;
1622     grpclb_policy->CancelBalancerChannelConnectivityWatchLocked();
1623     grpclb_policy->fallback_mode_ = true;
1624     grpclb_policy->CreateOrUpdateChildPolicyLocked();
1625   }
1626   grpclb_policy->Unref(DEBUG_LOCATION, "on_fallback_timer");
1627 }
1628
1629 //
1630 // code for interacting with the child policy
1631 //
1632
1633 grpc_channel_args* GrpcLb::CreateChildPolicyArgsLocked(
1634     bool is_backend_from_grpclb_load_balancer) {
1635   grpc_arg args_to_add[2] = {
1636       // A channel arg indicating if the target is a backend inferred from a
1637       // grpclb load balancer.
1638       grpc_channel_arg_integer_create(
1639           const_cast<char*>(
1640               GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER),
1641           is_backend_from_grpclb_load_balancer),
1642   };
1643   size_t num_args_to_add = 1;
1644   if (is_backend_from_grpclb_load_balancer) {
1645     args_to_add[num_args_to_add++] = grpc_channel_arg_integer_create(
1646         const_cast<char*>(GRPC_ARG_INHIBIT_HEALTH_CHECKING), 1);
1647   }
1648   return grpc_channel_args_copy_and_add(args_, args_to_add, num_args_to_add);
1649 }
1650
1651 OrphanablePtr<LoadBalancingPolicy> GrpcLb::CreateChildPolicyLocked(
1652     const char* name, const grpc_channel_args* args) {
1653   Helper* helper = New<Helper>(Ref());
1654   LoadBalancingPolicy::Args lb_policy_args;
1655   lb_policy_args.combiner = combiner();
1656   lb_policy_args.args = args;
1657   lb_policy_args.channel_control_helper =
1658       UniquePtr<ChannelControlHelper>(helper);
1659   OrphanablePtr<LoadBalancingPolicy> lb_policy =
1660       LoadBalancingPolicyRegistry::CreateLoadBalancingPolicy(
1661           name, std::move(lb_policy_args));
1662   if (GPR_UNLIKELY(lb_policy == nullptr)) {
1663     gpr_log(GPR_ERROR, "[grpclb %p] Failure creating child policy %s", this,
1664             name);
1665     return nullptr;
1666   }
1667   helper->set_child(lb_policy.get());
1668   if (grpc_lb_glb_trace.enabled()) {
1669     gpr_log(GPR_INFO, "[grpclb %p] Created new child policy %s (%p)", this,
1670             name, lb_policy.get());
1671   }
1672   // Add the gRPC LB's interested_parties pollset_set to that of the newly
1673   // created child policy. This will make the child policy progress upon
1674   // activity on gRPC LB, which in turn is tied to the application's call.
1675   grpc_pollset_set_add_pollset_set(lb_policy->interested_parties(),
1676                                    interested_parties());
1677   return lb_policy;
1678 }
1679
1680 void GrpcLb::CreateOrUpdateChildPolicyLocked() {
1681   if (shutting_down_) return;
1682   // Construct update args.
1683   UpdateArgs update_args;
1684   bool is_backend_from_grpclb_load_balancer = false;
1685   if (fallback_mode_) {
1686     // If CreateOrUpdateChildPolicyLocked() is invoked when we haven't
1687     // received any serverlist from the balancer, we use the fallback backends
1688     // returned by the resolver. Note that the fallback backend list may be
1689     // empty, in which case the new round_robin policy will keep the requested
1690     // picks pending.
1691     update_args.addresses = fallback_backend_addresses_;
1692   } else {
1693     update_args.addresses = serverlist_->GetServerAddressList(
1694         lb_calld_ == nullptr ? nullptr : lb_calld_->client_stats());
1695     is_backend_from_grpclb_load_balancer = true;
1696   }
1697   update_args.args =
1698       CreateChildPolicyArgsLocked(is_backend_from_grpclb_load_balancer);
1699   GPR_ASSERT(update_args.args != nullptr);
1700   update_args.config = child_policy_config_;
1701   // If the child policy name changes, we need to create a new child
1702   // policy.  When this happens, we leave child_policy_ as-is and store
1703   // the new child policy in pending_child_policy_.  Once the new child
1704   // policy transitions into state READY, we swap it into child_policy_,
1705   // replacing the original child policy.  So pending_child_policy_ is
1706   // non-null only between when we apply an update that changes the child
1707   // policy name and when the new child reports state READY.
1708   //
1709   // Updates can arrive at any point during this transition.  We always
1710   // apply updates relative to the most recently created child policy,
1711   // even if the most recent one is still in pending_child_policy_.  This
1712   // is true both when applying the updates to an existing child policy
1713   // and when determining whether we need to create a new policy.
1714   //
1715   // As a result of this, there are several cases to consider here:
1716   //
1717   // 1. We have no existing child policy (i.e., we have started up but
1718   //    have not yet received a serverlist from the balancer or gone
1719   //    into fallback mode; in this case, both child_policy_ and
1720   //    pending_child_policy_ are null).  In this case, we create a
1721   //    new child policy and store it in child_policy_.
1722   //
1723   // 2. We have an existing child policy and have no pending child policy
1724   //    from a previous update (i.e., either there has not been a
1725   //    previous update that changed the policy name, or we have already
1726   //    finished swapping in the new policy; in this case, child_policy_
1727   //    is non-null but pending_child_policy_ is null).  In this case:
1728   //    a. If child_policy_->name() equals child_policy_name, then we
1729   //       update the existing child policy.
1730   //    b. If child_policy_->name() does not equal child_policy_name,
1731   //       we create a new policy.  The policy will be stored in
1732   //       pending_child_policy_ and will later be swapped into
1733   //       child_policy_ by the helper when the new child transitions
1734   //       into state READY.
1735   //
1736   // 3. We have an existing child policy and have a pending child policy
1737   //    from a previous update (i.e., a previous update set
1738   //    pending_child_policy_ as per case 2b above and that policy has
1739   //    not yet transitioned into state READY and been swapped into
1740   //    child_policy_; in this case, both child_policy_ and
1741   //    pending_child_policy_ are non-null).  In this case:
1742   //    a. If pending_child_policy_->name() equals child_policy_name,
1743   //       then we update the existing pending child policy.
1744   //    b. If pending_child_policy->name() does not equal
1745   //       child_policy_name, then we create a new policy.  The new
1746   //       policy is stored in pending_child_policy_ (replacing the one
1747   //       that was there before, which will be immediately shut down)
1748   //       and will later be swapped into child_policy_ by the helper
1749   //       when the new child transitions into state READY.
1750   const char* child_policy_name = child_policy_config_ == nullptr
1751                                       ? "round_robin"
1752                                       : child_policy_config_->name();
1753   const bool create_policy =
1754       // case 1
1755       child_policy_ == nullptr ||
1756       // case 2b
1757       (pending_child_policy_ == nullptr &&
1758        strcmp(child_policy_->name(), child_policy_name) != 0) ||
1759       // case 3b
1760       (pending_child_policy_ != nullptr &&
1761        strcmp(pending_child_policy_->name(), child_policy_name) != 0);
1762   LoadBalancingPolicy* policy_to_update = nullptr;
1763   if (create_policy) {
1764     // Cases 1, 2b, and 3b: create a new child policy.
1765     // If child_policy_ is null, we set it (case 1), else we set
1766     // pending_child_policy_ (cases 2b and 3b).
1767     if (grpc_lb_glb_trace.enabled()) {
1768       gpr_log(GPR_INFO, "[grpclb %p] Creating new %schild policy %s", this,
1769               child_policy_ == nullptr ? "" : "pending ", child_policy_name);
1770     }
1771     auto new_policy =
1772         CreateChildPolicyLocked(child_policy_name, update_args.args);
1773     // Swap the policy into place.
1774     auto& lb_policy =
1775         child_policy_ == nullptr ? child_policy_ : pending_child_policy_;
1776     {
1777       MutexLock lock(&child_policy_mu_);
1778       lb_policy = std::move(new_policy);
1779     }
1780     policy_to_update = lb_policy.get();
1781   } else {
1782     // Cases 2a and 3a: update an existing policy.
1783     // If we have a pending child policy, send the update to the pending
1784     // policy (case 3a), else send it to the current policy (case 2a).
1785     policy_to_update = pending_child_policy_ != nullptr
1786                            ? pending_child_policy_.get()
1787                            : child_policy_.get();
1788   }
1789   GPR_ASSERT(policy_to_update != nullptr);
1790   // Update the policy.
1791   if (grpc_lb_glb_trace.enabled()) {
1792     gpr_log(GPR_INFO, "[grpclb %p] Updating %schild policy %p", this,
1793             policy_to_update == pending_child_policy_.get() ? "pending " : "",
1794             policy_to_update);
1795   }
1796   policy_to_update->UpdateLocked(std::move(update_args));
1797 }
1798
1799 //
1800 // factory
1801 //
1802
1803 class GrpcLbFactory : public LoadBalancingPolicyFactory {
1804  public:
1805   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
1806       LoadBalancingPolicy::Args args) const override {
1807     return OrphanablePtr<LoadBalancingPolicy>(New<GrpcLb>(std::move(args)));
1808   }
1809
1810   const char* name() const override { return kGrpclb; }
1811 };
1812
1813 }  // namespace
1814
1815 }  // namespace grpc_core
1816
1817 //
1818 // Plugin registration
1819 //
1820
1821 namespace {
1822
1823 // Only add client_load_reporting filter if the grpclb LB policy is used.
1824 bool maybe_add_client_load_reporting_filter(grpc_channel_stack_builder* builder,
1825                                             void* arg) {
1826   const grpc_channel_args* args =
1827       grpc_channel_stack_builder_get_channel_arguments(builder);
1828   const grpc_arg* channel_arg =
1829       grpc_channel_args_find(args, GRPC_ARG_LB_POLICY_NAME);
1830   if (channel_arg != nullptr && channel_arg->type == GRPC_ARG_STRING &&
1831       strcmp(channel_arg->value.string, "grpclb") == 0) {
1832     return grpc_channel_stack_builder_append_filter(
1833         builder, (const grpc_channel_filter*)arg, nullptr, nullptr);
1834   }
1835   return true;
1836 }
1837
1838 }  // namespace
1839
1840 void grpc_lb_policy_grpclb_init() {
1841   grpc_core::LoadBalancingPolicyRegistry::Builder::
1842       RegisterLoadBalancingPolicyFactory(
1843           grpc_core::UniquePtr<grpc_core::LoadBalancingPolicyFactory>(
1844               grpc_core::New<grpc_core::GrpcLbFactory>()));
1845   grpc_channel_init_register_stage(GRPC_CLIENT_SUBCHANNEL,
1846                                    GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
1847                                    maybe_add_client_load_reporting_filter,
1848                                    (void*)&grpc_client_load_reporting_filter);
1849 }
1850
1851 void grpc_lb_policy_grpclb_shutdown() {}