2a72910456969f25a0dcb87ae502a05573e4de18
[platform/upstream/grpc.git] / test / core / end2end / tests / retry_lb_drop.cc
1 //
2 // Copyright 2017 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "test/core/end2end/end2end_tests.h"
18
19 #include <stdio.h>
20 #include <string.h>
21
22 #include <grpc/byte_buffer.h>
23 #include <grpc/grpc.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26 #include <grpc/support/string_util.h>
27 #include <grpc/support/time.h>
28
29 #include "src/core/ext/filters/client_channel/lb_policy_registry.h"
30 #include "src/core/lib/channel/channel_args.h"
31 #include "src/core/lib/gpr/string.h"
32 #include "src/core/lib/gpr/useful.h"
33 #include "src/core/lib/iomgr/exec_ctx.h"
34 #include "src/core/lib/transport/static_metadata.h"
35
36 #include "test/core/end2end/cq_verifier.h"
37 #include "test/core/end2end/tests/cancel_test_helpers.h"
38 #include "test/core/util/test_lb_policies.h"
39
40 namespace grpc_core {
41 namespace {
42
43 const char* kDropPolicyName = "drop_lb";
44
45 class DropPolicy : public LoadBalancingPolicy {
46  public:
47   explicit DropPolicy(Args args) : LoadBalancingPolicy(std::move(args)) {}
48
49   const char* name() const override { return kDropPolicyName; }
50
51   void UpdateLocked(UpdateArgs) override {
52     channel_control_helper()->UpdateState(GRPC_CHANNEL_READY, absl::Status(),
53                                           absl::make_unique<DropPicker>());
54   }
55
56   void ResetBackoffLocked() override {}
57   void ShutdownLocked() override {}
58
59  private:
60   class DropPicker : public SubchannelPicker {
61    public:
62     PickResult Pick(PickArgs /*args*/) override {
63       return PickResult::Drop(
64           absl::UnavailableError("Call dropped by drop LB policy"));
65     }
66   };
67 };
68
69 class DropLbConfig : public LoadBalancingPolicy::Config {
70  public:
71   const char* name() const override { return kDropPolicyName; }
72 };
73
74 class DropPolicyFactory : public LoadBalancingPolicyFactory {
75  public:
76   OrphanablePtr<LoadBalancingPolicy> CreateLoadBalancingPolicy(
77       LoadBalancingPolicy::Args args) const override {
78     return MakeOrphanable<DropPolicy>(std::move(args));
79   }
80
81   const char* name() const override { return kDropPolicyName; }
82
83   RefCountedPtr<LoadBalancingPolicy::Config> ParseLoadBalancingConfig(
84       const Json& /*json*/, grpc_error_handle* /*error*/) const override {
85     return MakeRefCounted<DropLbConfig>();
86   }
87 };
88
89 std::vector<PickArgsSeen>* g_pick_args_vector = nullptr;
90
91 void RegisterDropPolicy() {
92   LoadBalancingPolicyRegistry::Builder::RegisterLoadBalancingPolicyFactory(
93       absl::make_unique<DropPolicyFactory>());
94   RegisterTestPickArgsLoadBalancingPolicy(
95       [](const PickArgsSeen& pick_args) {
96         GPR_ASSERT(g_pick_args_vector != nullptr);
97         g_pick_args_vector->push_back(pick_args);
98       },
99       kDropPolicyName);
100 }
101
102 }  // namespace
103 }  // namespace grpc_core
104
105 static void* tag(intptr_t t) { return reinterpret_cast<void*>(t); }
106
107 static grpc_end2end_test_fixture begin_test(grpc_end2end_test_config config,
108                                             const char* test_name,
109                                             grpc_channel_args* client_args,
110                                             grpc_channel_args* server_args) {
111   grpc_end2end_test_fixture f;
112   gpr_log(GPR_INFO, "Running test: %s/%s", test_name, config.name);
113   f = config.create_fixture(client_args, server_args);
114   config.init_server(&f, server_args);
115   config.init_client(&f, client_args);
116   return f;
117 }
118
119 static gpr_timespec n_seconds_from_now(int n) {
120   return grpc_timeout_seconds_to_deadline(n);
121 }
122
123 static gpr_timespec five_seconds_from_now(void) {
124   return n_seconds_from_now(5);
125 }
126
127 static void drain_cq(grpc_completion_queue* cq) {
128   grpc_event ev;
129   do {
130     ev = grpc_completion_queue_next(cq, five_seconds_from_now(), nullptr);
131   } while (ev.type != GRPC_QUEUE_SHUTDOWN);
132 }
133
134 static void shutdown_server(grpc_end2end_test_fixture* f) {
135   if (!f->server) return;
136   grpc_server_shutdown_and_notify(f->server, f->shutdown_cq, tag(1000));
137   GPR_ASSERT(grpc_completion_queue_pluck(f->shutdown_cq, tag(1000),
138                                          grpc_timeout_seconds_to_deadline(5),
139                                          nullptr)
140                  .type == GRPC_OP_COMPLETE);
141   grpc_server_destroy(f->server);
142   f->server = nullptr;
143 }
144
145 static void shutdown_client(grpc_end2end_test_fixture* f) {
146   if (!f->client) return;
147   grpc_channel_destroy(f->client);
148   f->client = nullptr;
149 }
150
151 static void end_test(grpc_end2end_test_fixture* f) {
152   shutdown_server(f);
153   shutdown_client(f);
154
155   grpc_completion_queue_shutdown(f->cq);
156   drain_cq(f->cq);
157   grpc_completion_queue_destroy(f->cq);
158   grpc_completion_queue_destroy(f->shutdown_cq);
159 }
160
161 // Tests that we don't retry when the LB policy drops a call,
162 // even when there is retry configuration in the service config.
163 // - 1 retry allowed for UNAVAILABLE status
164 // - first attempt returns UNAVAILABLE due to LB drop but does not retry
165 static void test_retry_lb_drop(grpc_end2end_test_config config) {
166   grpc_call* c;
167   grpc_op ops[6];
168   grpc_op* op;
169   grpc_metadata_array initial_metadata_recv;
170   grpc_metadata_array trailing_metadata_recv;
171   grpc_slice request_payload_slice = grpc_slice_from_static_string("foo");
172   grpc_byte_buffer* request_payload =
173       grpc_raw_byte_buffer_create(&request_payload_slice, 1);
174   grpc_byte_buffer* response_payload_recv = nullptr;
175   grpc_status_code status;
176   grpc_call_error error;
177   grpc_slice details;
178
179   std::vector<grpc_core::PickArgsSeen> pick_args_seen;
180   grpc_core::g_pick_args_vector = &pick_args_seen;
181
182   grpc_arg args[] = {
183       grpc_channel_arg_string_create(
184           const_cast<char*>(GRPC_ARG_SERVICE_CONFIG),
185           const_cast<char*>(
186               "{\n"
187               "  \"loadBalancingConfig\": [ {\n"
188               "    \"test_pick_args_lb\": {}\n"
189               "  } ],\n"
190               "  \"methodConfig\": [ {\n"
191               "    \"name\": [\n"
192               "      { \"service\": \"service\", \"method\": \"method\" }\n"
193               "    ],\n"
194               "    \"retryPolicy\": {\n"
195               "      \"maxAttempts\": 2,\n"
196               "      \"initialBackoff\": \"1s\",\n"
197               "      \"maxBackoff\": \"120s\",\n"
198               "      \"backoffMultiplier\": 1.6,\n"
199               "      \"retryableStatusCodes\": [ \"UNAVAILABLE\" ]\n"
200               "    }\n"
201               "  } ]\n"
202               "}")),
203   };
204   grpc_channel_args client_args = {GPR_ARRAY_SIZE(args), args};
205   grpc_end2end_test_fixture f =
206       begin_test(config, "retry_lb_drop", &client_args, nullptr);
207
208   cq_verifier* cqv = cq_verifier_create(f.cq);
209
210   gpr_timespec deadline = five_seconds_from_now();
211   c = grpc_channel_create_call(f.client, nullptr, GRPC_PROPAGATE_DEFAULTS, f.cq,
212                                grpc_slice_from_static_string("/service/method"),
213                                nullptr, deadline, nullptr);
214   GPR_ASSERT(c);
215
216   grpc_metadata_array_init(&initial_metadata_recv);
217   grpc_metadata_array_init(&trailing_metadata_recv);
218
219   memset(ops, 0, sizeof(ops));
220   op = ops;
221   op->op = GRPC_OP_SEND_INITIAL_METADATA;
222   op->data.send_initial_metadata.count = 0;
223   op++;
224   op->op = GRPC_OP_SEND_MESSAGE;
225   op->data.send_message.send_message = request_payload;
226   op++;
227   op->op = GRPC_OP_RECV_MESSAGE;
228   op->data.recv_message.recv_message = &response_payload_recv;
229   op++;
230   op->op = GRPC_OP_SEND_CLOSE_FROM_CLIENT;
231   op++;
232   op->op = GRPC_OP_RECV_INITIAL_METADATA;
233   op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
234   op++;
235   op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
236   op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
237   op->data.recv_status_on_client.status = &status;
238   op->data.recv_status_on_client.status_details = &details;
239   op++;
240   error = grpc_call_start_batch(c, ops, static_cast<size_t>(op - ops), tag(1),
241                                 nullptr);
242   GPR_ASSERT(GRPC_CALL_OK == error);
243
244   CQ_EXPECT_COMPLETION(cqv, tag(1), true);
245   cq_verify(cqv);
246
247   GPR_ASSERT(status == GRPC_STATUS_UNAVAILABLE);
248   GPR_ASSERT(0 ==
249              grpc_slice_str_cmp(details, "Call dropped by drop LB policy"));
250
251   grpc_slice_unref(details);
252   grpc_metadata_array_destroy(&initial_metadata_recv);
253   grpc_metadata_array_destroy(&trailing_metadata_recv);
254   grpc_byte_buffer_destroy(request_payload);
255   grpc_byte_buffer_destroy(response_payload_recv);
256
257   grpc_call_unref(c);
258
259   cq_verifier_destroy(cqv);
260
261   gpr_log(GPR_INFO, "NUMBER OF LB PICKS: %" PRIuPTR, pick_args_seen.size());
262   GPR_ASSERT(pick_args_seen.size() == 1);
263
264   grpc_core::g_pick_args_vector = nullptr;
265
266   end_test(&f);
267   config.tear_down_data(&f);
268 }
269
270 void retry_lb_drop(grpc_end2end_test_config config) {
271   GPR_ASSERT(config.feature_mask & FEATURE_MASK_SUPPORTS_CLIENT_CHANNEL);
272   test_retry_lb_drop(config);
273 }
274
275 void retry_lb_drop_pre_init(void) { grpc_core::RegisterDropPolicy(); }