3 * Copyright 2017 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include <grpc/support/log.h>
20 #include <grpcpp/channel.h>
21 #include <grpcpp/create_channel.h>
22 #include <grpcpp/impl/grpc_library.h>
23 #include <grpcpp/security/credentials.h>
24 #include <grpcpp/security/server_credentials.h>
25 #include <grpcpp/server.h>
26 #include <grpcpp/server_builder.h>
27 #include <gtest/gtest.h>
29 #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h"
30 #include "src/core/lib/channel/channel_args.h"
31 #include "src/core/lib/iomgr/endpoint.h"
32 #include "src/core/lib/iomgr/endpoint_pair.h"
33 #include "src/core/lib/iomgr/exec_ctx.h"
34 #include "src/core/lib/iomgr/tcp_posix.h"
35 #include "src/core/lib/surface/channel.h"
36 #include "src/core/lib/surface/completion_queue.h"
37 #include "src/core/lib/surface/server.h"
38 #include "test/core/util/passthru_endpoint.h"
39 #include "test/core/util/port.h"
41 #include "src/cpp/client/create_channel_internal.h"
42 #include "src/proto/grpc/testing/echo.grpc.pb.h"
43 #include "test/core/util/test_config.h"
48 static void* tag(intptr_t x) { return reinterpret_cast<void*>(x); }
50 static void ApplyCommonServerBuilderConfig(ServerBuilder* b) {
51 b->SetMaxReceiveMessageSize(INT_MAX);
52 b->SetMaxSendMessageSize(INT_MAX);
55 static void ApplyCommonChannelArguments(ChannelArguments* c) {
56 c->SetInt(GRPC_ARG_MAX_RECEIVE_MESSAGE_LENGTH, INT_MAX);
57 c->SetInt(GRPC_ARG_MAX_SEND_MESSAGE_LENGTH, INT_MAX);
60 static class InitializeStuff {
64 rq_ = grpc_resource_quota_create("bm");
67 ~InitializeStuff() { init_lib_.shutdown(); }
69 grpc_resource_quota* rq() { return rq_; }
72 internal::GrpcLibrary init_lib_;
73 grpc_resource_quota* rq_;
76 class EndpointPairFixture {
78 EndpointPairFixture(Service* service, grpc_endpoint_pair endpoints) {
80 cq_ = b.AddCompletionQueue(true);
81 b.RegisterService(service);
82 ApplyCommonServerBuilderConfig(&b);
83 server_ = b.BuildAndStart();
85 grpc_core::ExecCtx exec_ctx;
87 /* add server endpoint to server_ */
89 const grpc_channel_args* server_args =
90 grpc_server_get_channel_args(server_->c_server());
91 grpc_transport* transport = grpc_create_chttp2_transport(
92 server_args, endpoints.server, false /* is_client */);
94 grpc_pollset** pollsets;
95 size_t num_pollsets = 0;
96 grpc_server_get_pollsets(server_->c_server(), &pollsets, &num_pollsets);
98 for (size_t i = 0; i < num_pollsets; i++) {
99 grpc_endpoint_add_to_pollset(endpoints.server, pollsets[i]);
102 grpc_server_setup_transport(server_->c_server(), transport, nullptr,
103 server_args, nullptr);
104 grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
109 ChannelArguments args;
110 args.SetString(GRPC_ARG_DEFAULT_AUTHORITY, "test.authority");
111 ApplyCommonChannelArguments(&args);
113 grpc_channel_args c_args = args.c_channel_args();
114 grpc_transport* transport =
115 grpc_create_chttp2_transport(&c_args, endpoints.client, true);
116 GPR_ASSERT(transport);
117 grpc_channel* channel = grpc_channel_create(
118 "target", &c_args, GRPC_CLIENT_DIRECT_CHANNEL, transport);
119 grpc_chttp2_transport_start_reading(transport, nullptr, nullptr);
121 channel_ = ::grpc::CreateChannelInternal(
123 std::vector<std::unique_ptr<
124 experimental::ClientInterceptorFactoryInterface>>());
128 virtual ~EndpointPairFixture() {
133 while (cq_->Next(&tag, &ok)) {
137 ServerCompletionQueue* cq() { return cq_.get(); }
138 std::shared_ptr<Channel> channel() { return channel_; }
141 std::unique_ptr<Server> server_;
142 std::unique_ptr<ServerCompletionQueue> cq_;
143 std::shared_ptr<Channel> channel_;
146 class InProcessCHTTP2 : public EndpointPairFixture {
148 InProcessCHTTP2(Service* service, grpc_passthru_endpoint_stats* stats)
149 : EndpointPairFixture(service, MakeEndpoints(stats)), stats_(stats) {}
151 virtual ~InProcessCHTTP2() {
152 if (stats_ != nullptr) {
153 grpc_passthru_endpoint_stats_destroy(stats_);
157 int writes_performed() const { return stats_->num_writes; }
160 grpc_passthru_endpoint_stats* stats_;
162 static grpc_endpoint_pair MakeEndpoints(grpc_passthru_endpoint_stats* stats) {
163 grpc_endpoint_pair p;
164 grpc_passthru_endpoint_create(&p.client, &p.server, initialize_stuff.rq(),
170 static double UnaryPingPong(int request_size, int response_size) {
171 const int kIterations = 10000;
173 EchoTestService::AsyncService service;
174 std::unique_ptr<InProcessCHTTP2> fixture(
175 new InProcessCHTTP2(&service, grpc_passthru_endpoint_stats_create()));
176 EchoRequest send_request;
177 EchoResponse send_response;
178 EchoResponse recv_response;
179 if (request_size > 0) {
180 send_request.set_message(std::string(request_size, 'a'));
182 if (response_size > 0) {
183 send_response.set_message(std::string(response_size, 'a'));
188 EchoRequest recv_request;
189 grpc::ServerAsyncResponseWriter<EchoResponse> response_writer;
190 ServerEnv() : response_writer(&ctx) {}
192 uint8_t server_env_buffer[2 * sizeof(ServerEnv)];
193 ServerEnv* server_env[2] = {
194 reinterpret_cast<ServerEnv*>(server_env_buffer),
195 reinterpret_cast<ServerEnv*>(server_env_buffer + sizeof(ServerEnv))};
196 new (server_env[0]) ServerEnv;
197 new (server_env[1]) ServerEnv;
198 service.RequestEcho(&server_env[0]->ctx, &server_env[0]->recv_request,
199 &server_env[0]->response_writer, fixture->cq(),
200 fixture->cq(), tag(0));
201 service.RequestEcho(&server_env[1]->ctx, &server_env[1]->recv_request,
202 &server_env[1]->response_writer, fixture->cq(),
203 fixture->cq(), tag(1));
204 std::unique_ptr<EchoTestService::Stub> stub(
205 EchoTestService::NewStub(fixture->channel()));
206 for (int iteration = 0; iteration < kIterations; iteration++) {
207 recv_response.Clear();
208 ClientContext cli_ctx;
209 std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader(
210 stub->AsyncEcho(&cli_ctx, send_request, fixture->cq()));
213 response_reader->Finish(&recv_response, &recv_status, tag(4));
214 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
216 GPR_ASSERT(t == tag(0) || t == tag(1));
217 intptr_t slot = reinterpret_cast<intptr_t>(t);
218 ServerEnv* senv = server_env[slot];
219 senv->response_writer.Finish(send_response, Status::OK, tag(3));
220 for (int i = (1 << 3) | (1 << 4); i != 0;) {
221 GPR_ASSERT(fixture->cq()->Next(&t, &ok));
223 int tagnum = static_cast<int>(reinterpret_cast<intptr_t>(t));
224 GPR_ASSERT(i & (1 << tagnum));
227 GPR_ASSERT(recv_status.ok());
230 senv = new (senv) ServerEnv();
231 service.RequestEcho(&senv->ctx, &senv->recv_request, &senv->response_writer,
232 fixture->cq(), fixture->cq(), tag(slot));
235 double writes_per_iteration =
236 static_cast<double>(fixture->writes_performed()) /
237 static_cast<double>(kIterations);
240 server_env[0]->~ServerEnv();
241 server_env[1]->~ServerEnv();
243 return writes_per_iteration;
246 TEST(WritesPerRpcTest, UnaryPingPong) {
247 EXPECT_LT(UnaryPingPong(0, 0), 2.05);
248 EXPECT_LT(UnaryPingPong(1, 0), 2.05);
249 EXPECT_LT(UnaryPingPong(0, 1), 2.05);
250 EXPECT_LT(UnaryPingPong(4096, 0), 2.5);
251 EXPECT_LT(UnaryPingPong(0, 4096), 2.5);
254 } // namespace testing
257 int main(int argc, char** argv) {
258 grpc::testing::TestEnvironment env(argc, argv);
259 ::testing::InitGoogleTest(&argc, argv);
260 return RUN_ALL_TESTS();