Imported Upstream version 1.33.0
[platform/upstream/grpc.git] / test / cpp / end2end / filter_end2end_test.cc
1 /*
2  *
3  * Copyright 2016 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <memory>
20 #include <mutex>
21 #include <thread>
22
23 #include <grpc/grpc.h>
24 #include <grpc/support/time.h>
25 #include <grpcpp/channel.h>
26 #include <grpcpp/client_context.h>
27 #include <grpcpp/create_channel.h>
28 #include <grpcpp/generic/async_generic_service.h>
29 #include <grpcpp/generic/generic_stub.h>
30 #include <grpcpp/impl/codegen/proto_utils.h>
31 #include <grpcpp/server.h>
32 #include <grpcpp/server_builder.h>
33 #include <grpcpp/server_context.h>
34 #include <grpcpp/support/config.h>
35 #include <grpcpp/support/slice.h>
36
37 #include "absl/memory/memory.h"
38
39 #include "src/cpp/common/channel_filter.h"
40 #include "src/proto/grpc/testing/echo.grpc.pb.h"
41 #include "test/core/util/port.h"
42 #include "test/core/util/test_config.h"
43 #include "test/cpp/util/byte_buffer_proto_helper.h"
44
45 #include <gtest/gtest.h>
46
47 using grpc::testing::EchoRequest;
48 using grpc::testing::EchoResponse;
49 using std::chrono::system_clock;
50
51 namespace grpc {
52 namespace testing {
53 namespace {
54
55 void* tag(int i) { return (void*)static_cast<intptr_t>(i); }
56
57 void verify_ok(CompletionQueue* cq, int i, bool expect_ok) {
58   bool ok;
59   void* got_tag;
60   EXPECT_TRUE(cq->Next(&got_tag, &ok));
61   EXPECT_EQ(expect_ok, ok);
62   EXPECT_EQ(tag(i), got_tag);
63 }
64
65 namespace {
66
67 int global_num_connections = 0;
68 int global_num_calls = 0;
69 std::mutex global_mu;
70
71 void IncrementConnectionCounter() {
72   std::unique_lock<std::mutex> lock(global_mu);
73   ++global_num_connections;
74 }
75
76 void ResetConnectionCounter() {
77   std::unique_lock<std::mutex> lock(global_mu);
78   global_num_connections = 0;
79 }
80
81 int GetConnectionCounterValue() {
82   std::unique_lock<std::mutex> lock(global_mu);
83   return global_num_connections;
84 }
85
86 void IncrementCallCounter() {
87   std::unique_lock<std::mutex> lock(global_mu);
88   ++global_num_calls;
89 }
90
91 void ResetCallCounter() {
92   std::unique_lock<std::mutex> lock(global_mu);
93   global_num_calls = 0;
94 }
95
96 int GetCallCounterValue() {
97   std::unique_lock<std::mutex> lock(global_mu);
98   return global_num_calls;
99 }
100
101 }  // namespace
102
103 class ChannelDataImpl : public ChannelData {
104  public:
105   grpc_error* Init(grpc_channel_element* /*elem*/,
106                    grpc_channel_element_args* /*args*/) override {
107     IncrementConnectionCounter();
108     return GRPC_ERROR_NONE;
109   }
110 };
111
112 class CallDataImpl : public CallData {
113  public:
114   void StartTransportStreamOpBatch(grpc_call_element* elem,
115                                    TransportStreamOpBatch* op) override {
116     // Incrementing the counter could be done from Init(), but we want
117     // to test that the individual methods are actually called correctly.
118     if (op->recv_initial_metadata() != nullptr) IncrementCallCounter();
119     grpc_call_next_op(elem, op->op());
120   }
121 };
122
123 class FilterEnd2endTest : public ::testing::Test {
124  protected:
125   FilterEnd2endTest() : server_host_("localhost") {}
126
127   static void SetUpTestCase() {
128     // Workaround for
129     // https://github.com/google/google-toolbox-for-mac/issues/242
130     static bool setup_done = false;
131     if (!setup_done) {
132       setup_done = true;
133       grpc::RegisterChannelFilter<ChannelDataImpl, CallDataImpl>(
134           "test-filter", GRPC_SERVER_CHANNEL, INT_MAX, nullptr);
135     }
136   }
137
138   void SetUp() override {
139     int port = grpc_pick_unused_port_or_die();
140     server_address_ << server_host_ << ":" << port;
141     // Setup server
142     ServerBuilder builder;
143     builder.AddListeningPort(server_address_.str(),
144                              InsecureServerCredentials());
145     builder.RegisterAsyncGenericService(&generic_service_);
146     srv_cq_ = builder.AddCompletionQueue();
147     server_ = builder.BuildAndStart();
148   }
149
150   void TearDown() override {
151     server_->Shutdown();
152     void* ignored_tag;
153     bool ignored_ok;
154     cli_cq_.Shutdown();
155     srv_cq_->Shutdown();
156     while (cli_cq_.Next(&ignored_tag, &ignored_ok))
157       ;
158     while (srv_cq_->Next(&ignored_tag, &ignored_ok))
159       ;
160   }
161
162   void ResetStub() {
163     std::shared_ptr<Channel> channel = grpc::CreateChannel(
164         server_address_.str(), InsecureChannelCredentials());
165     generic_stub_ = absl::make_unique<GenericStub>(channel);
166     ResetConnectionCounter();
167     ResetCallCounter();
168   }
169
170   void server_ok(int i) { verify_ok(srv_cq_.get(), i, true); }
171   void client_ok(int i) { verify_ok(&cli_cq_, i, true); }
172   void server_fail(int i) { verify_ok(srv_cq_.get(), i, false); }
173   void client_fail(int i) { verify_ok(&cli_cq_, i, false); }
174
175   void SendRpc(int num_rpcs) {
176     const std::string kMethodName("/grpc.cpp.test.util.EchoTestService/Echo");
177     for (int i = 0; i < num_rpcs; i++) {
178       EchoRequest send_request;
179       EchoRequest recv_request;
180       EchoResponse send_response;
181       EchoResponse recv_response;
182       Status recv_status;
183
184       ClientContext cli_ctx;
185       GenericServerContext srv_ctx;
186       GenericServerAsyncReaderWriter stream(&srv_ctx);
187
188       // The string needs to be long enough to test heap-based slice.
189       send_request.set_message("Hello world. Hello world. Hello world.");
190       std::thread request_call([this]() { server_ok(4); });
191       std::unique_ptr<GenericClientAsyncReaderWriter> call =
192           generic_stub_->PrepareCall(&cli_ctx, kMethodName, &cli_cq_);
193       call->StartCall(tag(1));
194       client_ok(1);
195       std::unique_ptr<ByteBuffer> send_buffer =
196           SerializeToByteBuffer(&send_request);
197       call->Write(*send_buffer, tag(2));
198       // Send ByteBuffer can be destroyed after calling Write.
199       send_buffer.reset();
200       client_ok(2);
201       call->WritesDone(tag(3));
202       client_ok(3);
203
204       generic_service_.RequestCall(&srv_ctx, &stream, srv_cq_.get(),
205                                    srv_cq_.get(), tag(4));
206
207       request_call.join();
208       EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
209       EXPECT_EQ(kMethodName, srv_ctx.method());
210       ByteBuffer recv_buffer;
211       stream.Read(&recv_buffer, tag(5));
212       server_ok(5);
213       EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
214       EXPECT_EQ(send_request.message(), recv_request.message());
215
216       send_response.set_message(recv_request.message());
217       send_buffer = SerializeToByteBuffer(&send_response);
218       stream.Write(*send_buffer, tag(6));
219       send_buffer.reset();
220       server_ok(6);
221
222       stream.Finish(Status::OK, tag(7));
223       server_ok(7);
224
225       recv_buffer.Clear();
226       call->Read(&recv_buffer, tag(8));
227       client_ok(8);
228       EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
229
230       call->Finish(&recv_status, tag(9));
231       client_ok(9);
232
233       EXPECT_EQ(send_response.message(), recv_response.message());
234       EXPECT_TRUE(recv_status.ok());
235     }
236   }
237
238   CompletionQueue cli_cq_;
239   std::unique_ptr<ServerCompletionQueue> srv_cq_;
240   std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
241   std::unique_ptr<grpc::GenericStub> generic_stub_;
242   std::unique_ptr<Server> server_;
243   AsyncGenericService generic_service_;
244   const std::string server_host_;
245   std::ostringstream server_address_;
246 };
247
248 TEST_F(FilterEnd2endTest, SimpleRpc) {
249   ResetStub();
250   EXPECT_EQ(0, GetConnectionCounterValue());
251   EXPECT_EQ(0, GetCallCounterValue());
252   SendRpc(1);
253   EXPECT_EQ(1, GetConnectionCounterValue());
254   EXPECT_EQ(1, GetCallCounterValue());
255 }
256
257 TEST_F(FilterEnd2endTest, SequentialRpcs) {
258   ResetStub();
259   EXPECT_EQ(0, GetConnectionCounterValue());
260   EXPECT_EQ(0, GetCallCounterValue());
261   SendRpc(10);
262   EXPECT_EQ(1, GetConnectionCounterValue());
263   EXPECT_EQ(10, GetCallCounterValue());
264 }
265
266 // One ping, one pong.
267 TEST_F(FilterEnd2endTest, SimpleBidiStreaming) {
268   ResetStub();
269   EXPECT_EQ(0, GetConnectionCounterValue());
270   EXPECT_EQ(0, GetCallCounterValue());
271
272   const std::string kMethodName(
273       "/grpc.cpp.test.util.EchoTestService/BidiStream");
274   EchoRequest send_request;
275   EchoRequest recv_request;
276   EchoResponse send_response;
277   EchoResponse recv_response;
278   Status recv_status;
279   ClientContext cli_ctx;
280   GenericServerContext srv_ctx;
281   GenericServerAsyncReaderWriter srv_stream(&srv_ctx);
282
283   cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
284   send_request.set_message("Hello");
285   std::thread request_call([this]() { server_ok(2); });
286   std::unique_ptr<GenericClientAsyncReaderWriter> cli_stream =
287       generic_stub_->PrepareCall(&cli_ctx, kMethodName, &cli_cq_);
288   cli_stream->StartCall(tag(1));
289   client_ok(1);
290
291   generic_service_.RequestCall(&srv_ctx, &srv_stream, srv_cq_.get(),
292                                srv_cq_.get(), tag(2));
293
294   request_call.join();
295   EXPECT_EQ(server_host_, srv_ctx.host().substr(0, server_host_.length()));
296   EXPECT_EQ(kMethodName, srv_ctx.method());
297
298   std::unique_ptr<ByteBuffer> send_buffer =
299       SerializeToByteBuffer(&send_request);
300   cli_stream->Write(*send_buffer, tag(3));
301   send_buffer.reset();
302   client_ok(3);
303
304   ByteBuffer recv_buffer;
305   srv_stream.Read(&recv_buffer, tag(4));
306   server_ok(4);
307   EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_request));
308   EXPECT_EQ(send_request.message(), recv_request.message());
309
310   send_response.set_message(recv_request.message());
311   send_buffer = SerializeToByteBuffer(&send_response);
312   srv_stream.Write(*send_buffer, tag(5));
313   send_buffer.reset();
314   server_ok(5);
315
316   cli_stream->Read(&recv_buffer, tag(6));
317   client_ok(6);
318   EXPECT_TRUE(ParseFromByteBuffer(&recv_buffer, &recv_response));
319   EXPECT_EQ(send_response.message(), recv_response.message());
320
321   cli_stream->WritesDone(tag(7));
322   client_ok(7);
323
324   srv_stream.Read(&recv_buffer, tag(8));
325   server_fail(8);
326
327   srv_stream.Finish(Status::OK, tag(9));
328   server_ok(9);
329
330   cli_stream->Finish(&recv_status, tag(10));
331   client_ok(10);
332
333   EXPECT_EQ(send_response.message(), recv_response.message());
334   EXPECT_TRUE(recv_status.ok());
335
336   EXPECT_EQ(1, GetCallCounterValue());
337   EXPECT_EQ(1, GetConnectionCounterValue());
338 }
339
340 }  // namespace
341 }  // namespace testing
342 }  // namespace grpc
343
344 int main(int argc, char** argv) {
345   grpc::testing::TestEnvironment env(argc, argv);
346   ::testing::InitGoogleTest(&argc, argv);
347   return RUN_ALL_TESTS();
348 }