3 * Copyright 2018 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
21 #include <grpcpp/channel.h>
22 #include <grpcpp/client_context.h>
23 #include <grpcpp/create_channel.h>
24 #include <grpcpp/server.h>
25 #include <grpcpp/server_builder.h>
26 #include <grpcpp/server_context.h>
28 #include "absl/memory/memory.h"
30 #include "src/core/lib/gpr/tls.h"
31 #include "src/core/lib/iomgr/port.h"
32 #include "src/proto/grpc/testing/echo.grpc.pb.h"
33 #include "test/core/util/port.h"
34 #include "test/core/util/test_config.h"
36 #ifdef GRPC_POSIX_SOCKET
37 #include "src/core/lib/iomgr/ev_posix.h"
38 #endif // GRPC_POSIX_SOCKET
40 #include <gtest/gtest.h>
42 #ifdef GRPC_POSIX_SOCKET
43 // Thread-local variable to so that only polls from this test assert
44 // non-blocking (not polls from resolver, timer thread, etc), and only when the
45 // thread is waiting on polls caused by CompletionQueue::AsyncNext (not for
46 // picking a port or other reasons).
47 GPR_TLS_DECL(g_is_nonblocking_poll);
51 int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds,
53 // Only assert that this poll should have zero timeout if we're in the
54 // middle of a zero-timeout CQ Next.
55 if (gpr_tls_get(&g_is_nonblocking_poll)) {
56 GPR_ASSERT(timeout == 0);
58 return poll(pfds, nfds, timeout);
67 void* tag(int i) { return reinterpret_cast<void*>(static_cast<intptr_t>(i)); }
68 int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }
70 class NonblockingTest : public ::testing::Test {
74 void SetUp() override {
75 port_ = grpc_pick_unused_port_or_die();
76 server_address_ << "localhost:" << port_;
79 BuildAndStartServer();
82 bool LoopForTag(void** tag, bool* ok) {
83 // Temporarily set the thread-local nonblocking poll flag so that the polls
84 // caused by this loop are indeed sent by the library with zero timeout.
85 intptr_t orig_val = gpr_tls_get(&g_is_nonblocking_poll);
86 gpr_tls_set(&g_is_nonblocking_poll, static_cast<intptr_t>(true));
88 auto r = cq_->AsyncNext(tag, ok, gpr_time_0(GPR_CLOCK_REALTIME));
89 if (r == CompletionQueue::SHUTDOWN) {
90 gpr_tls_set(&g_is_nonblocking_poll, orig_val);
92 } else if (r == CompletionQueue::GOT_EVENT) {
93 gpr_tls_set(&g_is_nonblocking_poll, orig_val);
99 void TearDown() override {
104 while (LoopForTag(&ignored_tag, &ignored_ok)) {
107 grpc_recycle_unused_port(port_);
110 void BuildAndStartServer() {
111 ServerBuilder builder;
112 builder.AddListeningPort(server_address_.str(),
113 grpc::InsecureServerCredentials());
115 absl::make_unique<grpc::testing::EchoTestService::AsyncService>();
116 builder.RegisterService(service_.get());
117 cq_ = builder.AddCompletionQueue();
118 server_ = builder.BuildAndStart();
122 std::shared_ptr<Channel> channel = grpc::CreateChannel(
123 server_address_.str(), grpc::InsecureChannelCredentials());
124 stub_ = grpc::testing::EchoTestService::NewStub(channel);
127 void SendRpc(int num_rpcs) {
128 for (int i = 0; i < num_rpcs; i++) {
129 EchoRequest send_request;
130 EchoRequest recv_request;
131 EchoResponse send_response;
132 EchoResponse recv_response;
135 ClientContext cli_ctx;
136 ServerContext srv_ctx;
137 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx);
139 send_request.set_message("hello non-blocking world");
140 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
141 stub_->PrepareAsyncEcho(&cli_ctx, send_request, cq_.get()));
143 response_reader->StartCall();
144 response_reader->Finish(&recv_response, &recv_status, tag(4));
146 service_->RequestEcho(&srv_ctx, &recv_request, &response_writer,
147 cq_.get(), cq_.get(), tag(2));
151 EXPECT_TRUE(LoopForTag(&got_tag, &ok));
153 EXPECT_EQ(detag(got_tag), 2);
154 EXPECT_EQ(send_request.message(), recv_request.message());
156 send_response.set_message(recv_request.message());
157 response_writer.Finish(send_response, Status::OK, tag(3));
161 EXPECT_TRUE(LoopForTag(&got_tag, &ok));
163 tagsum += detag(got_tag);
164 tagprod *= detag(got_tag);
166 EXPECT_TRUE(LoopForTag(&got_tag, &ok));
168 tagsum += detag(got_tag);
169 tagprod *= detag(got_tag);
171 EXPECT_EQ(tagsum, 7);
172 EXPECT_EQ(tagprod, 12);
173 EXPECT_EQ(send_response.message(), recv_response.message());
174 EXPECT_TRUE(recv_status.ok());
178 std::unique_ptr<ServerCompletionQueue> cq_;
179 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
180 std::unique_ptr<Server> server_;
181 std::unique_ptr<grpc::testing::EchoTestService::AsyncService> service_;
182 std::ostringstream server_address_;
186 TEST_F(NonblockingTest, SimpleRpc) {
192 } // namespace testing
195 #endif // GRPC_POSIX_SOCKET
197 int main(int argc, char** argv) {
198 #ifdef GRPC_POSIX_SOCKET
199 // Override the poll function before anything else can happen
200 grpc_poll_function = maybe_assert_non_blocking_poll;
202 grpc::testing::TestEnvironment env(argc, argv);
203 ::testing::InitGoogleTest(&argc, argv);
204 gpr_tls_init(&g_is_nonblocking_poll);
206 // Start the nonblocking poll thread-local variable as false because the
207 // thread that issues RPCs starts by picking a port (which has non-zero
209 gpr_tls_set(&g_is_nonblocking_poll, static_cast<intptr_t>(false));
211 int ret = RUN_ALL_TESTS();
212 gpr_tls_destroy(&g_is_nonblocking_poll);
214 #else // GRPC_POSIX_SOCKET
216 #endif // GRPC_POSIX_SOCKET