3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
23 #include <gtest/gtest.h>
25 #include <grpc/grpc.h>
26 #include <grpc/support/time.h>
27 #include <grpcpp/channel.h>
28 #include <grpcpp/client_context.h>
29 #include <grpcpp/create_channel.h>
30 #include <grpcpp/impl/codegen/sync.h>
31 #include <grpcpp/resource_quota.h>
32 #include <grpcpp/server.h>
33 #include <grpcpp/server_builder.h>
34 #include <grpcpp/server_context.h>
36 #include "src/core/lib/gpr/env.h"
37 #include "src/core/lib/surface/api_trace.h"
38 #include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h"
39 #include "src/proto/grpc/testing/echo.grpc.pb.h"
40 #include "test/core/util/port.h"
41 #include "test/core/util/test_config.h"
43 using grpc::testing::EchoRequest;
44 using grpc::testing::EchoResponse;
46 const int kNumThreads = 100; // Number of threads
47 const int kNumAsyncSendThreads = 2;
48 const int kNumAsyncReceiveThreads = 50;
49 const int kNumAsyncServerThreads = 50;
50 const int kNumRpcs = 1000; // Number of RPCs per thread
55 class TestServiceImpl : public ::grpc::testing::EchoTestService::Service {
59 Status Echo(ServerContext* /*context*/, const EchoRequest* request,
60 EchoResponse* response) override {
61 response->set_message(request->message());
66 template <class Service>
67 class CommonStressTest {
69 CommonStressTest() : kMaxMessageSize_(8192) {
71 // Workaround Apple CFStream bug
72 gpr_setenv("grpc_cfstream", "0");
75 virtual ~CommonStressTest() {}
76 virtual void SetUp() = 0;
77 virtual void TearDown() = 0;
78 virtual void ResetStub() = 0;
79 virtual bool AllowExhaustion() = 0;
80 grpc::testing::EchoTestService::Stub* GetStub() { return stub_.get(); }
83 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
84 std::unique_ptr<Server> server_;
86 virtual void SetUpStart(ServerBuilder* builder, Service* service) = 0;
87 void SetUpStartCommon(ServerBuilder* builder, Service* service) {
88 builder->RegisterService(service);
89 builder->SetMaxMessageSize(
90 kMaxMessageSize_); // For testing max message size.
92 void SetUpEnd(ServerBuilder* builder) { server_ = builder->BuildAndStart(); }
93 void TearDownStart() { server_->Shutdown(); }
97 const int kMaxMessageSize_;
100 template <class Service>
101 class CommonStressTestInsecure : public CommonStressTest<Service> {
103 void ResetStub() override {
104 std::shared_ptr<Channel> channel = grpc::CreateChannel(
105 server_address_.str(), InsecureChannelCredentials());
106 this->stub_ = grpc::testing::EchoTestService::NewStub(channel);
108 bool AllowExhaustion() override { return false; }
111 void SetUpStart(ServerBuilder* builder, Service* service) override {
112 int port = grpc_pick_unused_port_or_die();
113 this->server_address_ << "localhost:" << port;
115 builder->AddListeningPort(server_address_.str(),
116 InsecureServerCredentials());
117 this->SetUpStartCommon(builder, service);
121 std::ostringstream server_address_;
124 template <class Service, bool allow_resource_exhaustion>
125 class CommonStressTestInproc : public CommonStressTest<Service> {
127 void ResetStub() override {
128 ChannelArguments args;
129 std::shared_ptr<Channel> channel = this->server_->InProcessChannel(args);
130 this->stub_ = grpc::testing::EchoTestService::NewStub(channel);
132 bool AllowExhaustion() override { return allow_resource_exhaustion; }
135 void SetUpStart(ServerBuilder* builder, Service* service) override {
136 this->SetUpStartCommon(builder, service);
140 template <class BaseClass>
141 class CommonStressTestSyncServer : public BaseClass {
143 void SetUp() override {
144 ServerBuilder builder;
145 this->SetUpStart(&builder, &service_);
146 this->SetUpEnd(&builder);
148 void TearDown() override {
149 this->TearDownStart();
154 TestServiceImpl service_;
157 template <class BaseClass>
158 class CommonStressTestSyncServerLowThreadCount : public BaseClass {
160 void SetUp() override {
161 ServerBuilder builder;
163 this->SetUpStart(&builder, &service_);
164 quota.SetMaxThreads(4);
165 builder.SetResourceQuota(quota);
166 this->SetUpEnd(&builder);
168 void TearDown() override {
169 this->TearDownStart();
174 TestServiceImpl service_;
177 template <class BaseClass>
178 class CommonStressTestAsyncServer : public BaseClass {
180 CommonStressTestAsyncServer() : contexts_(kNumAsyncServerThreads * 100) {}
181 void SetUp() override {
182 shutting_down_ = false;
183 ServerBuilder builder;
184 this->SetUpStart(&builder, &service_);
185 cq_ = builder.AddCompletionQueue();
186 this->SetUpEnd(&builder);
187 for (int i = 0; i < kNumAsyncServerThreads * 100; i++) {
190 for (int i = 0; i < kNumAsyncServerThreads; i++) {
191 server_threads_.emplace_back(&CommonStressTestAsyncServer::ProcessRpcs,
195 void TearDown() override {
197 grpc::internal::MutexLock l(&mu_);
198 this->TearDownStart();
199 shutting_down_ = true;
203 for (int i = 0; i < kNumAsyncServerThreads; i++) {
204 server_threads_[i].join();
209 while (cq_->Next(&ignored_tag, &ignored_ok)) {
218 while (cq_->Next(&tag, &ok)) {
220 int i = static_cast<int>(reinterpret_cast<intptr_t>(tag));
221 switch (contexts_[i].state) {
222 case Context::READY: {
223 contexts_[i].state = Context::DONE;
224 EchoResponse send_response;
225 send_response.set_message(contexts_[i].recv_request.message());
226 contexts_[i].response_writer->Finish(send_response, Status::OK,
237 void RefreshContext(int i) {
238 grpc::internal::MutexLock l(&mu_);
239 if (!shutting_down_) {
240 contexts_[i].state = Context::READY;
241 contexts_[i].srv_ctx.reset(new ServerContext);
242 contexts_[i].response_writer.reset(
243 new grpc::ServerAsyncResponseWriter<EchoResponse>(
244 contexts_[i].srv_ctx.get()));
245 service_.RequestEcho(contexts_[i].srv_ctx.get(),
246 &contexts_[i].recv_request,
247 contexts_[i].response_writer.get(), cq_.get(),
248 cq_.get(), reinterpret_cast<void*>(i));
252 std::unique_ptr<ServerContext> srv_ctx;
253 std::unique_ptr<grpc::ServerAsyncResponseWriter<EchoResponse>>
255 EchoRequest recv_request;
256 enum { READY, DONE } state;
258 std::vector<Context> contexts_;
259 ::grpc::testing::EchoTestService::AsyncService service_;
260 std::unique_ptr<ServerCompletionQueue> cq_;
262 grpc::internal::Mutex mu_;
263 std::vector<std::thread> server_threads_;
266 template <class Common>
267 class End2endTest : public ::testing::Test {
270 void SetUp() override { common_.SetUp(); }
271 void TearDown() override { common_.TearDown(); }
272 void ResetStub() { common_.ResetStub(); }
277 static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs,
278 bool allow_exhaustion, gpr_atm* errors) {
280 EchoResponse response;
281 request.set_message("Hello");
283 for (int i = 0; i < num_rpcs; ++i) {
284 ClientContext context;
285 Status s = stub->Echo(&context, request, &response);
286 EXPECT_TRUE(s.ok() || (allow_exhaustion &&
287 s.error_code() == StatusCode::RESOURCE_EXHAUSTED));
289 if (!(allow_exhaustion &&
290 s.error_code() == StatusCode::RESOURCE_EXHAUSTED)) {
291 gpr_log(GPR_ERROR, "RPC error: %d: %s", s.error_code(),
292 s.error_message().c_str());
294 gpr_atm_no_barrier_fetch_add(errors, static_cast<gpr_atm>(1));
296 EXPECT_EQ(response.message(), request.message());
301 typedef ::testing::Types<
302 CommonStressTestSyncServer<CommonStressTestInsecure<TestServiceImpl>>,
303 CommonStressTestSyncServer<CommonStressTestInproc<TestServiceImpl, false>>,
304 CommonStressTestSyncServerLowThreadCount<
305 CommonStressTestInproc<TestServiceImpl, true>>,
306 CommonStressTestAsyncServer<
307 CommonStressTestInsecure<grpc::testing::EchoTestService::AsyncService>>,
308 CommonStressTestAsyncServer<CommonStressTestInproc<
309 grpc::testing::EchoTestService::AsyncService, false>>>
311 TYPED_TEST_SUITE(End2endTest, CommonTypes);
312 TYPED_TEST(End2endTest, ThreadStress) {
313 this->common_.ResetStub();
314 std::vector<std::thread> threads;
316 gpr_atm_rel_store(&errors, static_cast<gpr_atm>(0));
317 threads.reserve(kNumThreads);
318 for (int i = 0; i < kNumThreads; ++i) {
319 threads.emplace_back(SendRpc, this->common_.GetStub(), kNumRpcs,
320 this->common_.AllowExhaustion(), &errors);
322 for (int i = 0; i < kNumThreads; ++i) {
325 uint64_t error_cnt = static_cast<uint64_t>(gpr_atm_no_barrier_load(&errors));
326 if (error_cnt != 0) {
327 gpr_log(GPR_INFO, "RPC error count: %" PRIu64, error_cnt);
329 // If this test allows resource exhaustion, expect that it actually sees some
330 if (this->common_.AllowExhaustion()) {
331 EXPECT_GT(error_cnt, static_cast<uint64_t>(0));
335 template <class Common>
336 class AsyncClientEnd2endTest : public ::testing::Test {
338 AsyncClientEnd2endTest() : rpcs_outstanding_(0) {}
340 void SetUp() override { common_.SetUp(); }
341 void TearDown() override {
344 while (cq_.Next(&ignored_tag, &ignored_ok)) {
350 grpc::internal::MutexLock l(&mu_);
351 while (rpcs_outstanding_ != 0) {
358 struct AsyncClientCall {
359 EchoResponse response;
360 ClientContext context;
362 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader;
365 void AsyncSendRpc(int num_rpcs) {
366 for (int i = 0; i < num_rpcs; ++i) {
367 AsyncClientCall* call = new AsyncClientCall;
369 request.set_message("Hello: " + std::to_string(i));
370 call->response_reader =
371 common_.GetStub()->AsyncEcho(&call->context, request, &cq_);
372 call->response_reader->Finish(&call->response, &call->status, call);
374 grpc::internal::MutexLock l(&mu_);
379 void AsyncCompleteRpc() {
383 if (!cq_.Next(&got_tag, &ok)) break;
384 AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag);
386 gpr_log(GPR_DEBUG, "Error: %d", call->status.error_code());
392 grpc::internal::MutexLock l(&mu_);
394 notify = (rpcs_outstanding_ == 0);
404 grpc::internal::Mutex mu_;
405 grpc::internal::CondVar cv_;
406 int rpcs_outstanding_;
409 TYPED_TEST_SUITE(AsyncClientEnd2endTest, CommonTypes);
410 TYPED_TEST(AsyncClientEnd2endTest, ThreadStress) {
411 this->common_.ResetStub();
412 std::vector<std::thread> send_threads, completion_threads;
413 for (int i = 0; i < kNumAsyncReceiveThreads; ++i) {
414 completion_threads.emplace_back(
415 &AsyncClientEnd2endTest_ThreadStress_Test<TypeParam>::AsyncCompleteRpc,
418 for (int i = 0; i < kNumAsyncSendThreads; ++i) {
419 send_threads.emplace_back(
420 &AsyncClientEnd2endTest_ThreadStress_Test<TypeParam>::AsyncSendRpc,
423 for (int i = 0; i < kNumAsyncSendThreads; ++i) {
424 send_threads[i].join();
428 for (int i = 0; i < kNumAsyncReceiveThreads; ++i) {
429 completion_threads[i].join();
433 } // namespace testing
436 int main(int argc, char** argv) {
437 grpc::testing::TestEnvironment env(argc, argv);
438 ::testing::InitGoogleTest(&argc, argv);
439 return RUN_ALL_TESTS();