8c4b3cf1fd3e33f428b937f50d04d99b27d0917d
[platform/upstream/grpc.git] / test / cpp / end2end / generic_end2end_test.cc
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <memory>
20 #include <thread>
21
22 #include <grpc/grpc.h>
23 #include <grpc/support/time.h>
24 #include <grpcpp/channel.h>
25 #include <grpcpp/client_context.h>
26 #include <grpcpp/create_channel.h>
27 #include <grpcpp/generic/async_generic_service.h>
28 #include <grpcpp/generic/generic_stub.h>
29 #include <grpcpp/impl/codegen/proto_utils.h>
30 #include <grpcpp/server.h>
31 #include <grpcpp/server_builder.h>
32 #include <grpcpp/server_context.h>
33 #include <grpcpp/support/slice.h>
34
35 #include "src/proto/grpc/testing/echo.grpc.pb.h"
36 #include "test/core/util/port.h"
37 #include "test/core/util/test_config.h"
38 #include "test/cpp/util/byte_buffer_proto_helper.h"
39
40 #include <gtest/gtest.h>
41
42 using grpc::testing::EchoRequest;
43 using grpc::testing::EchoResponse;
44 using std::chrono::system_clock;
45
46 namespace grpc {
47 namespace testing {
48 namespace {
49
50 void* tag(int i) { return (void*)static_cast<intptr_t>(i); }
51
52 void verify_ok(CompletionQueue* cq, int i, bool expect_ok) {
53   bool ok;
54   void* got_tag;
55   EXPECT_TRUE(cq->Next(&got_tag, &ok));
56   EXPECT_EQ(expect_ok, ok);
57   EXPECT_EQ(tag(i), got_tag);
58 }
59
60 class GenericEnd2endTest : public ::testing::Test {
61  protected:
62   GenericEnd2endTest() : server_host_("localhost") {}
63
64   void SetUp() override {
65     int port = grpc_pick_unused_port_or_die();
66     server_address_ << server_host_ << ":" << port;
67     // Setup server
68     ServerBuilder builder;
69     builder.AddListeningPort(server_address_.str(),
70                              InsecureServerCredentials());
71     builder.RegisterAsyncGenericService(&generic_service_);
72     // Include a second call to RegisterAsyncGenericService to make sure that
73     // we get an error in the log, since it is not allowed to have 2 async
74     // generic services
75     builder.RegisterAsyncGenericService(&generic_service_);
76     srv_cq_ = builder.AddCompletionQueue();
77     server_ = builder.BuildAndStart();
78   }
79
80   void TearDown() override {
81     server_->Shutdown();
82     void* ignored_tag;
83     bool ignored_ok;
84     cli_cq_.Shutdown();
85     srv_cq_->Shutdown();
86     while (cli_cq_.Next(&ignored_tag, &ignored_ok))
87       ;
88     while (srv_cq_->Next(&ignored_tag, &ignored_ok))
89       ;
90   }
91
92   void ResetStub() {
93     std::shared_ptr<Channel> channel =
94         CreateChannel(server_address_.str(), InsecureChannelCredentials());
95     generic_stub_.reset(new GenericStub(channel));
96   }
97
98   void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); }
99   void client_ok(int i) { verify_ok(&cli_cq_, i, true); }
100   void server_fail(int i) { verify_ok(srv_cq_.get(), i, false); }
101   void client_fail(int i) { verify_ok(&cli_cq_, i, false); }
102
103   void SendRpc(int num_rpcs) {
104     SendRpc(num_rpcs, false, gpr_inf_future(GPR_CLOCK_MONOTONIC));
105   }
106
107   void SendRpc(int num_rpcs, bool check_deadline, gpr_timespec deadline) {
108     const grpc::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
109     for (int i = 0; i < num_rpcs; i++) {
110       EchoRequest send_request;
111       EchoRequest recv_request;
112       EchoResponse send_response;
113       EchoResponse recv_response;
114       Status recv_status;
115
116       ClientContext cli_ctx;
117       GenericServerContext srv_ctx;
118       GenericServerAsyncReaderWriter stream(&srv_ctx);
119
120       // The string needs to be long enough to test heap-based slice.
121       send_request.set_message("Hello world. Hello world. Hello world.");
122
123       if (check_deadline) {
124         cli_ctx.set_deadline(deadline);
125       }
126
127       std::unique_ptr<GenericClientAsyncReaderWriter> call =
128           generic_stub_->PrepareCall(&cli_ctx, kMethodName, &cli_cq_);
129       call->StartCall(tag(1));
130       client_ok(1);
131       std::unique_ptr<ByteBuffer> send_buffer =
132           SerializeToByteBuffer(&send_request);
133       call->Write(*send_buffer, tag(2));
134       // Send ByteBuffer can be destroyed after calling Write.
135       send_buffer.reset();
136       client_ok(2);
137       call->WritesDone(tag(3));
138       client_ok(3);
139
140       generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(),
141                                    srv_cq_.get(), tag(4));
142
143       verify_ok(srv_cq_.get(), 4, true);
144       EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
145       EXPECT_EQ(kMethodName, srv_ctx.method());
146
147       if (check_deadline) {
148         EXPECT_TRUE(gpr_time_similar(deadline, srv_ctx.raw_deadline(),
149                                      gpr_time_from_millis(1000, GPR_TIMESPAN)));
150       }
151
152       ByteBuffer recv_buffer;
153       stream.Read(&recv_buffer, tag(5));
154       server_ok(5);
155       EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
156       EXPECT_EQ(send_request.message(), recv_request.message());
157
158       send_response.set_message(recv_request.message());
159       send_buffer = SerializeToByteBuffer(&send_response);
160       stream.Write(*send_buffer, tag(6));
161       send_buffer.reset();
162       server_ok(6);
163
164       stream.Finish(Status::OK, tag(7));
165       server_ok(7);
166
167       recv_buffer.Clear();
168       call->Read(&recv_buffer, tag(8));
169       client_ok(8);
170       EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
171
172       call->Finish(&recv_status, tag(9));
173       client_ok(9);
174
175       EXPECT_EQ(send_response.message(), recv_response.message());
176       EXPECT_TRUE(recv_status.ok());
177     }
178   }
179
180   CompletionQueue cli_cq_;
181   std::unique_ptr<ServerCompletionQueue> srv_cq_;
182   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
183   std::unique_ptr<grpc::GenericStub> generic_stub_;
184   std::unique_ptr<Server> server_;
185   AsyncGenericService generic_service_;
186   const grpc::string server_host_;
187   std::ostringstream server_address_;
188 };
189
190 TEST_F(GenericEnd2endTest, SimpleRpc) {
191   ResetStub();
192   SendRpc(1);
193 }
194
195 TEST_F(GenericEnd2endTest, SequentialRpcs) {
196   ResetStub();
197   SendRpc(10);
198 }
199
200 TEST_F(GenericEnd2endTest, SequentialUnaryRpcs) {
201   ResetStub();
202   const int num_rpcs = 10;
203   const grpc::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
204   for (int i = 0; i < num_rpcs; i++) {
205     EchoRequest send_request;
206     EchoRequest recv_request;
207     EchoResponse send_response;
208     EchoResponse recv_response;
209     Status recv_status;
210
211     ClientContext cli_ctx;
212     GenericServerContext srv_ctx;
213     GenericServerAsyncReaderWriter stream(&srv_ctx);
214
215     // The string needs to be long enough to test heap-based slice.
216     send_request.set_message("Hello world. Hello world. Hello world.");
217
218     std::unique_ptr<ByteBuffer> cli_send_buffer =
219         SerializeToByteBuffer(&send_request);
220     // Use the same cq as server so that events can be polled in time.
221     std::unique_ptr<GenericClientAsyncResponseReader> call =
222         generic_stub_->PrepareUnaryCall(&cli_ctx, kMethodName,
223                                         *cli_send_buffer.get(), &cli_cq_);
224     call->StartCall();
225     ByteBuffer cli_recv_buffer;
226     call->Finish(&cli_recv_buffer, &recv_status, tag(1));
227     std::thread client_check([this] { client_ok(1); });
228
229     generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(),
230                                  srv_cq_.get(), tag(4));
231
232     server_ok(4);
233     EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
234     EXPECT_EQ(kMethodName, srv_ctx.method());
235
236     ByteBuffer srv_recv_buffer;
237     stream.Read(&srv_recv_buffer, tag(5));
238     server_ok(5);
239     EXPECT_TRUE(ParseFromByteBuffer(&srv_recv_buffer, &recv_request));
240     EXPECT_EQ(send_request.message(), recv_request.message());
241
242     send_response.set_message(recv_request.message());
243     std::unique_ptr<ByteBuffer> srv_send_buffer =
244         SerializeToByteBuffer(&send_response);
245     stream.Write(*srv_send_buffer, tag(6));
246     server_ok(6);
247
248     stream.Finish(Status::OK, tag(7));
249     server_ok(7);
250
251     client_check.join();
252     EXPECT_TRUE(ParseFromByteBuffer(&cli_recv_buffer, &recv_response));
253     EXPECT_EQ(send_response.message(), recv_response.message());
254     EXPECT_TRUE(recv_status.ok());
255   }
256 }
257
258 // One ping, one pong.
259 TEST_F(GenericEnd2endTest, SimpleBidiStreaming) {
260   ResetStub();
261
262   const grpc::string kMethodName(
263       "/grpc.cpp.test.util.EchoTestService/BidiStream");
264   EchoRequest send_request;
265   EchoRequest recv_request;
266   EchoResponse send_response;
267   EchoResponse recv_response;
268   Status recv_status;
269   ClientContext cli_ctx;
270   GenericServerContext srv_ctx;
271   GenericServerAsyncReaderWriter srv_stream(&srv_ctx);
272
273   cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
274   send_request.set_message("Hello");
275   std::unique_ptr<GenericClientAsyncReaderWriter> cli_stream =
276       generic_stub_->PrepareCall(&cli_ctx, kMethodName, &cli_cq_);
277   cli_stream->StartCall(tag(1));
278   client_ok(1);
279
280   generic_service_.RequestCall(&srv_ctx, &srv_stream, srv_cq_.get(),
281                                srv_cq_.get(), tag(2));
282
283   verify_ok(srv_cq_.get(), 2, true);
284   EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
285   EXPECT_EQ(kMethodName, srv_ctx.method());
286
287   std::unique_ptr<ByteBuffer> send_buffer =
288       SerializeToByteBuffer(&send_request);
289   cli_stream->Write(*send_buffer, tag(3));
290   send_buffer.reset();
291   client_ok(3);
292
293   ByteBuffer recv_buffer;
294   srv_stream.Read(&recv_buffer, tag(4));
295   server_ok(4);
296   EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
297   EXPECT_EQ(send_request.message(), recv_request.message());
298
299   send_response.set_message(recv_request.message());
300   send_buffer = SerializeToByteBuffer(&send_response);
301   srv_stream.Write(*send_buffer, tag(5));
302   send_buffer.reset();
303   server_ok(5);
304
305   cli_stream->Read(&recv_buffer, tag(6));
306   client_ok(6);
307   EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
308   EXPECT_EQ(send_response.message(), recv_response.message());
309
310   cli_stream->WritesDone(tag(7));
311   client_ok(7);
312
313   srv_stream.Read(&recv_buffer, tag(8));
314   server_fail(8);
315
316   srv_stream.Finish(Status::OK, tag(9));
317   server_ok(9);
318
319   cli_stream->Finish(&recv_status, tag(10));
320   client_ok(10);
321
322   EXPECT_EQ(send_response.message(), recv_response.message());
323   EXPECT_TRUE(recv_status.ok());
324 }
325
326 TEST_F(GenericEnd2endTest, Deadline) {
327   ResetStub();
328   SendRpc(1, true,
329           gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
330                        gpr_time_from_seconds(10, GPR_TIMESPAN)));
331 }
332
333 }  // namespace
334 }  // namespace testing
335 }  // namespace grpc
336
337 int main(int argc, char** argv) {
338   grpc::testing::TestEnvironment env(argc, argv);
339   ::testing::InitGoogleTest(&argc, argv);
340   return RUN_ALL_TESTS();
341 }