3 * Copyright 2020 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/port_platform.h>
28 #include <gmock/gmock.h>
30 #include <grpc/grpc.h>
31 #include <grpc/grpc_security.h>
32 #include <grpc/impl/codegen/grpc_types.h>
33 #include <grpc/slice.h>
34 #include <grpc/support/alloc.h>
35 #include <grpc/support/log.h>
36 #include <grpc/support/string_util.h>
37 #include <grpc/support/time.h>
39 #include "absl/strings/str_cat.h"
40 #include "absl/strings/str_format.h"
41 #include "absl/types/optional.h"
43 #include "src/core/ext/filters/client_channel/resolver/fake/fake_resolver.h"
44 #include "src/core/lib/gpr/useful.h"
45 #include "src/core/lib/gprpp/host_port.h"
46 #include "src/core/lib/gprpp/thd.h"
47 #include "src/core/lib/iomgr/error.h"
48 #include "src/core/lib/iomgr/parse_address.h"
49 #include "src/core/lib/security/credentials/alts/alts_credentials.h"
50 #include "src/core/lib/security/credentials/credentials.h"
51 #include "src/core/lib/security/security_connector/alts/alts_security_connector.h"
52 #include "src/core/lib/slice/slice_string_helpers.h"
53 #include "src/core/lib/uri/uri_parser.h"
55 #include "test/core/util/memory_counters.h"
56 #include "test/core/util/port.h"
57 #include "test/core/util/test_config.h"
59 #include "test/core/end2end/cq_verifier.h"
63 const int kNumMessagePingPongsPerCall = 4000;
66 explicit TestCall(grpc_channel* channel, grpc_call* call,
67 grpc_completion_queue* cq)
68 : channel(channel), call(call), cq(cq) {}
70 TestCall(const TestCall& other) = delete;
71 TestCall& operator=(const TestCall& other) = delete;
74 grpc_call_cancel(call, nullptr);
75 grpc_call_unref(call);
76 grpc_channel_destroy(channel);
77 grpc_completion_queue_shutdown(cq);
78 while (grpc_completion_queue_next(cq, gpr_inf_future(GPR_CLOCK_REALTIME),
80 .type != GRPC_QUEUE_SHUTDOWN) {
82 grpc_completion_queue_destroy(cq);
85 grpc_channel* channel;
87 grpc_completion_queue* cq;
88 absl::optional<grpc_status_code>
89 status; // filled in when the call is finished
92 void StartCall(TestCall* test_call) {
95 memset(ops, 0, sizeof(ops));
97 op->op = GRPC_OP_SEND_INITIAL_METADATA;
98 op->data.send_initial_metadata.count = 0;
99 op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY;
100 op->reserved = nullptr;
102 void* tag = test_call;
103 grpc_call_error error = grpc_call_start_batch(
104 test_call->call, ops, static_cast<size_t>(op - ops), tag, nullptr);
105 GPR_ASSERT(GRPC_CALL_OK == error);
106 cq_verifier* cqv = cq_verifier_create(test_call->cq);
107 CQ_EXPECT_COMPLETION(cqv, tag, 1);
109 cq_verifier_destroy(cqv);
112 void SendMessage(grpc_call* call, grpc_completion_queue* cq) {
113 grpc_slice request_payload_slice = grpc_slice_from_copied_string("a");
114 grpc_byte_buffer* request_payload =
115 grpc_raw_byte_buffer_create(&request_payload_slice, 1);
118 memset(ops, 0, sizeof(ops));
120 op->op = GRPC_OP_SEND_MESSAGE;
121 op->data.send_message.send_message = request_payload;
122 op->reserved = nullptr;
125 grpc_call_error error = grpc_call_start_batch(
126 call, ops, static_cast<size_t>(op - ops), tag, nullptr);
127 GPR_ASSERT(GRPC_CALL_OK == error);
128 cq_verifier* cqv = cq_verifier_create(cq);
129 CQ_EXPECT_COMPLETION(cqv, tag, 1);
131 cq_verifier_destroy(cqv);
132 grpc_byte_buffer_destroy(request_payload);
135 void ReceiveMessage(grpc_call* call, grpc_completion_queue* cq) {
136 grpc_byte_buffer* request_payload = nullptr;
139 memset(ops, 0, sizeof(ops));
141 op->op = GRPC_OP_RECV_MESSAGE;
142 op->data.recv_message.recv_message = &request_payload;
143 op->reserved = nullptr;
146 grpc_call_error error = grpc_call_start_batch(
147 call, ops, static_cast<size_t>(op - ops), tag, nullptr);
148 GPR_ASSERT(GRPC_CALL_OK == error);
149 cq_verifier* cqv = cq_verifier_create(cq);
150 CQ_EXPECT_COMPLETION(cqv, tag, 1);
152 cq_verifier_destroy(cqv);
153 grpc_byte_buffer_destroy(request_payload);
156 void ReceiveInitialMetadata(TestCall* test_call, gpr_timespec deadline) {
157 grpc_metadata_array initial_metadata_recv;
158 grpc_metadata_array_init(&initial_metadata_recv);
161 memset(ops, 0, sizeof(ops));
163 op->op = GRPC_OP_RECV_INITIAL_METADATA;
164 op->data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv;
165 op->reserved = nullptr;
167 void* tag = test_call;
168 grpc_call_error error = grpc_call_start_batch(
169 test_call->call, ops, static_cast<size_t>(op - ops), tag, nullptr);
170 GPR_ASSERT(GRPC_CALL_OK == error);
172 grpc_completion_queue_next(test_call->cq, deadline, nullptr);
173 if (event.type != GRPC_OP_COMPLETE || !event.success) {
175 "Wanted op complete with success, got op type:%d success:%d",
176 event.type, event.success);
179 GPR_ASSERT(event.tag == tag);
180 grpc_metadata_array_destroy(&initial_metadata_recv);
183 void FinishCall(TestCall* test_call) {
186 grpc_metadata_array trailing_metadata_recv;
187 grpc_status_code status;
189 grpc_metadata_array_init(&trailing_metadata_recv);
190 memset(ops, 0, sizeof(ops));
192 op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
193 op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv;
194 op->data.recv_status_on_client.status = &status;
195 op->data.recv_status_on_client.status_details = &details;
197 op->reserved = nullptr;
199 void* tag = test_call;
200 grpc_call_error error = grpc_call_start_batch(
201 test_call->call, ops, static_cast<size_t>(op - ops), tag, nullptr);
202 GPR_ASSERT(GRPC_CALL_OK == error);
203 grpc_event event = grpc_completion_queue_next(
204 test_call->cq, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
205 GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
206 GPR_ASSERT(event.success);
207 GPR_ASSERT(event.tag == tag);
208 test_call->status = status;
209 grpc_metadata_array_destroy(&trailing_metadata_recv);
210 grpc_slice_unref(details);
215 explicit TestServer() {
216 cq_ = grpc_completion_queue_create_for_next(nullptr);
217 server_ = grpc_server_create(nullptr, nullptr);
219 grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die());
220 grpc_server_register_completion_queue(server_, cq_, nullptr);
221 GPR_ASSERT(grpc_server_add_insecure_http2_port(server_, address_.c_str()));
222 grpc_server_start(server_);
223 thread_ = std::thread(std::bind(&TestServer::AcceptThread, this));
228 void* shutdown_and_notify_tag = this;
229 grpc_server_shutdown_and_notify(server_, cq_, shutdown_and_notify_tag);
230 grpc_event event = grpc_completion_queue_next(
231 cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
232 GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
233 GPR_ASSERT(event.tag == shutdown_and_notify_tag);
234 GPR_ASSERT(event.success);
235 grpc_server_destroy(server_);
236 grpc_completion_queue_shutdown(cq_);
237 while (grpc_completion_queue_next(cq_, gpr_inf_future(GPR_CLOCK_REALTIME),
239 .type != GRPC_QUEUE_SHUTDOWN) {
241 grpc_completion_queue_destroy(cq_);
244 std::string address() const { return address_; }
247 void AcceptThread() {
248 grpc_call_details call_details;
249 grpc_call_details_init(&call_details);
250 grpc_metadata_array request_metadata_recv;
251 grpc_metadata_array_init(&request_metadata_recv);
252 void* tag = &call_details;
254 grpc_call_error error = grpc_server_request_call(
255 server_, &call, &call_details, &request_metadata_recv, cq_, cq_, tag);
256 GPR_ASSERT(error == GRPC_CALL_OK);
257 grpc_event event = grpc_completion_queue_next(
258 cq_, gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
259 GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
260 GPR_ASSERT(event.success);
261 GPR_ASSERT(event.tag == tag);
264 memset(ops, 0, sizeof(ops));
266 op->op = GRPC_OP_SEND_INITIAL_METADATA;
267 op->data.send_initial_metadata.count = 0;
268 op->reserved = nullptr;
270 error = grpc_call_start_batch(call, ops, static_cast<size_t>(op - ops), tag,
272 GPR_ASSERT(GRPC_CALL_OK == error);
273 event = grpc_completion_queue_next(cq_, gpr_inf_future(GPR_CLOCK_REALTIME),
275 GPR_ASSERT(event.type == GRPC_OP_COMPLETE);
276 GPR_ASSERT(event.success);
277 GPR_ASSERT(event.tag == tag);
278 for (int i = 0; i < kNumMessagePingPongsPerCall; i++) {
279 ReceiveMessage(call, cq_);
280 SendMessage(call, cq_);
282 grpc_call_cancel_with_status(call, GRPC_STATUS_PERMISSION_DENIED,
283 "test status", nullptr);
284 grpc_metadata_array_destroy(&request_metadata_recv);
285 grpc_call_details_destroy(&call_details);
286 grpc_call_unref(call);
289 grpc_server* server_;
290 grpc_completion_queue* cq_;
291 std::string address_;
295 grpc_core::Resolver::Result BuildResolverResponse(
296 const std::vector<std::string>& addresses) {
297 grpc_core::Resolver::Result result;
298 for (const auto& address_str : addresses) {
299 absl::StatusOr<grpc_core::URI> uri = grpc_core::URI::Parse(address_str);
301 gpr_log(GPR_ERROR, "Failed to parse. Error: %s",
302 uri.status().ToString().c_str());
303 GPR_ASSERT(uri.ok());
305 grpc_resolved_address address;
306 GPR_ASSERT(grpc_parse_uri(*uri, &address));
307 result.addresses.emplace_back(address.addr, address.len, nullptr);
312 // Perform a simple RPC where the server cancels the request with
313 // grpc_call_cancel_with_status
314 TEST(Pollers, TestReadabilityNotificationsDontGetStrandedOnOneCq) {
315 gpr_log(GPR_DEBUG, "test thread");
316 /* 64 is a somewhat arbitary number, the important thing is that it
317 * exceeds the value of MAX_EPOLL_EVENTS_HANDLED_EACH_POLL_CALL (16), which
318 * is enough to repro a bug at time of writing. */
319 const int kNumCalls = 64;
320 size_t ping_pong_round = 0;
321 size_t ping_pongs_done = 0;
322 grpc_core::Mutex ping_pong_round_mu;
323 grpc_core::CondVar ping_pong_round_cv;
324 const std::string kSharedUnconnectableAddress =
325 grpc_core::JoinHostPort("127.0.0.1", grpc_pick_unused_port_or_die());
326 gpr_log(GPR_DEBUG, "created unconnectable address:%s",
327 kSharedUnconnectableAddress.c_str());
328 std::vector<std::thread> threads;
329 threads.reserve(kNumCalls);
330 std::vector<std::unique_ptr<TestServer>> test_servers;
331 // Instantiate servers inline here, so that we get port allocation out of the
332 // way and don't depend on it during the actual test. It can sometimes take
333 // time to allocate kNumCalls ports from the port server, and we don't want to
334 // hit test timeouts because of that.
335 test_servers.reserve(kNumCalls);
336 for (int i = 0; i < kNumCalls; i++) {
337 test_servers.push_back(absl::make_unique<TestServer>());
339 for (int i = 0; i < kNumCalls; i++) {
340 auto test_server = test_servers[i].get();
341 threads.push_back(std::thread([kSharedUnconnectableAddress,
342 &ping_pong_round, &ping_pongs_done,
343 &ping_pong_round_mu, &ping_pong_round_cv,
345 gpr_log(GPR_DEBUG, "using test_server with address:%s",
346 test_server->address().c_str());
347 std::vector<grpc_arg> args;
348 grpc_arg service_config_arg;
349 service_config_arg.type = GRPC_ARG_STRING;
350 service_config_arg.key = const_cast<char*>(GRPC_ARG_SERVICE_CONFIG);
351 service_config_arg.value.string =
352 const_cast<char*>("{\"loadBalancingConfig\":[{\"round_robin\":{}}]}");
353 args.push_back(service_config_arg);
354 auto fake_resolver_response_generator =
355 grpc_core::MakeRefCounted<grpc_core::FakeResolverResponseGenerator>();
357 grpc_core::ExecCtx exec_ctx;
358 fake_resolver_response_generator->SetResponse(BuildResolverResponse(
359 {absl::StrCat("ipv4:", kSharedUnconnectableAddress),
360 absl::StrCat("ipv4:", test_server->address())}));
362 args.push_back(grpc_core::FakeResolverResponseGenerator::MakeChannelArg(
363 fake_resolver_response_generator.get()));
364 grpc_channel_args* channel_args =
365 grpc_channel_args_copy_and_add(nullptr, args.data(), args.size());
366 grpc_channel* channel = grpc_insecure_channel_create(
367 "fake:///test.server.com", channel_args, nullptr);
368 grpc_channel_args_destroy(channel_args);
369 grpc_completion_queue* cq =
370 grpc_completion_queue_create_for_next(nullptr);
371 grpc_call* call = grpc_channel_create_call(
372 channel, nullptr, GRPC_PROPAGATE_DEFAULTS, cq,
373 grpc_slice_from_static_string("/foo"), nullptr,
374 gpr_inf_future(GPR_CLOCK_REALTIME), nullptr);
375 auto test_call = absl::make_unique<TestCall>(channel, call, cq);
376 // Start a call, and ensure that round_robin load balancing is configured
377 StartCall(test_call.get());
378 // Make sure the test is doing what it's meant to be doing
379 grpc_channel_info channel_info;
380 memset(&channel_info, 0, sizeof(channel_info));
381 char* lb_policy_name = nullptr;
382 channel_info.lb_policy_name = &lb_policy_name;
383 grpc_channel_get_info(channel, &channel_info);
384 EXPECT_EQ(std::string(lb_policy_name), "round_robin")
385 << "not using round robin; this test has a low chance of hitting the "
386 "bug that it's meant to try to hit";
387 gpr_free(lb_policy_name);
388 // Receive initial metadata
390 "now receive initial metadata on call with server address:%s",
391 test_server->address().c_str());
392 ReceiveInitialMetadata(test_call.get(),
393 grpc_timeout_seconds_to_deadline(30));
394 for (int i = 1; i <= kNumMessagePingPongsPerCall; i++) {
396 grpc_core::MutexLock lock(&ping_pong_round_mu);
397 ping_pong_round_cv.SignalAll();
398 while (int(ping_pong_round) != i) {
399 ping_pong_round_cv.Wait(&ping_pong_round_mu);
402 SendMessage(test_call->call, test_call->cq);
403 ReceiveMessage(test_call->call, test_call->cq);
405 grpc_core::MutexLock lock(&ping_pong_round_mu);
407 ping_pong_round_cv.SignalAll();
410 gpr_log(GPR_DEBUG, "now receive status on call with server address:%s",
411 test_server->address().c_str());
412 FinishCall(test_call.get());
413 GPR_ASSERT(test_call->status.has_value());
414 GPR_ASSERT(test_call->status.value() == GRPC_STATUS_PERMISSION_DENIED);
416 grpc_core::ExecCtx exec_ctx;
417 fake_resolver_response_generator.reset();
421 for (size_t i = 1; i <= kNumMessagePingPongsPerCall; i++) {
423 grpc_core::MutexLock lock(&ping_pong_round_mu);
424 while (ping_pongs_done < ping_pong_round * kNumCalls) {
425 ping_pong_round_cv.Wait(&ping_pong_round_mu);
428 ping_pong_round_cv.SignalAll();
429 gpr_log(GPR_DEBUG, "initiate ping pong round: %ld", ping_pong_round);
432 for (auto& thread : threads) {
435 gpr_log(GPR_DEBUG, "All RPCs completed!");
440 int main(int argc, char** argv) {
441 ::testing::InitGoogleTest(&argc, argv);
442 grpc::testing::TestEnvironment env(argc, argv);
444 auto result = RUN_ALL_TESTS();