2 // Copyright 2017 gRPC authors.
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
17 #include "test/core/end2end/end2end_tests.h"
22 #include <grpc/byte_buffer.h>
23 #include <grpc/grpc.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/string_util.h>
27 #include <grpc/support/time.h>
29 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
30 #include "src/core/lib/channel/channel_args.h"
31 #include "src/core/lib/gpr/string.h"
32 #include "src/core/lib/gpr/useful.h"
33 #include "src/core/lib/iomgr/exec_ctx.h"
34 #include "src/core/lib/transport/static_metadata.h"
36 #include "test/core/end2end/cq_verifier.h"
37 #include "test/core/end2end/tests/cancel_test_helpers.h"
38 #include "test/core/util/test_lb_policies.h"
43 const char* kDropPolicyName = "drop_lb";
45 class DropPolicy : public LoadBalancingPolicy {
47 explicit DropPolicy(Args args) : LoadBalancingPolicy(std::move(args)) {}
49 const char* name() const override { return kDropPolicyName; }
51 void UpdateLocked(UpdateArgs) override {
52 channel_control_helper()->UpdateState(GRPC_CHANNEL_READY, absl::Status(),
53 absl::make_unique<DropPicker>());
56 void ResetBackoffLocked() override {}
57 void ShutdownLocked() override {}
60 class DropPicker : public SubchannelPicker {
62 PickResult Pick(PickArgs /*args*/) override {
63 return PickResult::Drop(
64 absl::UnavailableError("Call dropped by drop LB policy"));
69 class DropLbConfig : public LoadBalancingPolicy::Config {
71 const char* name() const override { return kDropPolicyName; }
74 class DropPolicyFactory : public LoadBalancingPolicyFactory {
76 OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
77 LoadBalancingPolicy::Args args) const override {
78 return MakeOrphanable<DropPolicy>(std::move(args));
81 const char* name() const override { return kDropPolicyName; }
83 RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
84 const Json& /*json*/, grpc_error_handle* /*error*/) const override {
85 return MakeRefCounted<DropLbConfig>();
89 std::vector<PickArgsSeen>* g_pick_args_vector = nullptr;
91 void RegisterDropPolicy() {
92 LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory(
93 absl::make_unique<DropPolicyFactory>());
94 RegisterTestPickArgsLoadBalancingPolicy(
95 [](const PickArgsSeen& pick_args) {
96 GPR_ASSERT(g_pick_args_vector != nullptr);
97 g_pick_args_vector->push_back(pick_args);
103 } // namespace grpc_core
105 static void* tag(intptr_t t) { return reinterpret_cast<void*>(t); }
107 static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
108 const char* test_name,
109 grpc_channel_args* client_args,
110 grpc_channel_args* server_args) {
111 grpc_end2end_test_fixture f;
112 gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
113 f = config.create_fixture(client_args, server_args);
114 config.init_server(&f, server_args);
115 config.init_client(&f, client_args);
119 static gpr_timespec n_seconds_from_now(int n) {
120 return grpc_timeout_seconds_to_deadline(n);
123 static gpr_timespec five_seconds_from_now(void) {
124 return n_seconds_from_now(5);
127 static void drain_cq(grpc_completion_queue* cq) {
130 ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr);
131 } while (ev.type != GRPC_QUEUE_SHUTDOWN);
134 static void shutdown_server(grpc_end2end_test_fixture* f) {
135 if (!f->server) return;
136 grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
137 GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
138 grpc_timeout_seconds_to_deadline(5),
140 .type == GRPC_OP_COMPLETE);
141 grpc_server_destroy(f->server);
145 static void shutdown_client(grpc_end2end_test_fixture* f) {
146 if (!f->client) return;
147 grpc_channel_destroy(f->client);
151 static void end_test(grpc_end2end_test_fixture* f) {
155 grpc_completion_queue_shutdown(f->cq);
157 grpc_completion_queue_destroy(f->cq);
158 grpc_completion_queue_destroy(f->shutdown_cq);
161 // Tests that we don't retry when the LB policy drops a call,
162 // even when there is retry configuration in the service config.
163 // - 1 retry allowed for UNAVAILABLE status
164 // - first attempt returns UNAVAILABLE due to LB drop but does not retry
165 static void test_retry_lb_drop(grpc_end2end_test_config config) {
169 grpc_metadata_array initial_metadata_recv;
170 grpc_metadata_array trailing_metadata_recv;
171 grpc_slice request_payload_slice = grpc_slice_from_static_string("foo");
172 grpc_byte_buffer* request_payload =
173 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
174 grpc_byte_buffer* response_payload_recv = nullptr;
175 grpc_status_code status;
176 grpc_call_error error;
179 std::vector<grpc_core::PickArgsSeen> pick_args_seen;
180 grpc_core::g_pick_args_vector = &pick_args_seen;
183 grpc_channel_arg_string_create(
184 const_cast<char*>(GRPC_ARG_SERVICE_CONFIG),
187 " \"loadBalancingConfig\": [ {\n"
188 " \"test_pick_args_lb\": {}\n"
190 " \"methodConfig\": [ {\n"
192 " { \"service\": \"service\", \"method\": \"method\" }\n"
194 " \"retryPolicy\": {\n"
195 " \"maxAttempts\": 2,\n"
196 " \"initialBackoff\": \"1s\",\n"
197 " \"maxBackoff\": \"120s\",\n"
198 " \"backoffMultiplier\": 1.6,\n"
199 " \"retryableStatusCodes\": [ \"UNAVAILABLE\" ]\n"
204 grpc_channel_args client_args = {GPR_ARRAY_SIZE(args), args};
205 grpc_end2end_test_fixture f =
206 begin_test(config, "retry_lb_drop", &client_args, nullptr);
208 cq_verifier* cqv = cq_verifier_create(f.cq);
210 gpr_timespec deadline = five_seconds_from_now();
211 c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
212 grpc_slice_from_static_string("/service/method"),
213 nullptr, deadline, nullptr);
216 grpc_metadata_array_init(&initial_metadata_recv);
217 grpc_metadata_array_init(&trailing_metadata_recv);
219 memset(ops, 0, sizeof(ops));
221 op->op = GRPC_OP_SEND_INITIAL_METADATA;
222 op->data.send_initial_metadata.count = 0;
224 op->op = GRPC_OP_SEND_MESSAGE;
225 op->data.send_message.send_message = request_payload;
227 op->op = GRPC_OP_RECV_MESSAGE;
228 op->data.recv_message.recv_message = &response_payload_recv;
230 op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
232 op->op = GRPC_OP_RECV_INITIAL_METADATA;
233 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
235 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
236 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
237 op->data.recv_status_on_client.status = &status;
238 op->data.recv_status_on_client.status_details = &details;
240 error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
242 GPR_ASSERT(GRPC_CALL_OK == error);
244 CQ_EXPECT_COMPLETION(cqv, tag(1), true);
247 GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);
249 grpc_slice_str_cmp(details, "Call dropped by drop LB policy"));
251 grpc_slice_unref(details);
252 grpc_metadata_array_destroy(&initial_metadata_recv);
253 grpc_metadata_array_destroy(&trailing_metadata_recv);
254 grpc_byte_buffer_destroy(request_payload);
255 grpc_byte_buffer_destroy(response_payload_recv);
259 cq_verifier_destroy(cqv);
261 gpr_log(GPR_INFO, "NUMBER OF LB PICKS: %" PRIuPTR, pick_args_seen.size());
262 GPR_ASSERT(pick_args_seen.size() == 1);
264 grpc_core::g_pick_args_vector = nullptr;
267 config.tear_down_data(&f);
270 void retry_lb_drop(grpc_end2end_test_config config) {
271 GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL);
272 test_retry_lb_drop(config);
275 void retry_lb_drop_pre_init(void) { grpc_core::RegisterDropPolicy(); }