Imported Upstream version 1.33.1
[platform/upstream/grpc.git] / test / cpp / end2end / message_allocator_end2end_test.cc
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <algorithm>
20 #include <atomic>
21 #include <condition_variable>
22 #include <functional>
23 #include <memory>
24 #include <mutex>
25 #include <sstream>
26 #include <thread>
27
28 #include <google/protobuf/arena.h>
29
30 #include <grpc/impl/codegen/log.h>
31 #include <gtest/gtest.h>
32
33 #include <grpcpp/channel.h>
34 #include <grpcpp/client_context.h>
35 #include <grpcpp/create_channel.h>
36 #include <grpcpp/server.h>
37 #include <grpcpp/server_builder.h>
38 #include <grpcpp/server_context.h>
39 #include <grpcpp/support/client_callback.h>
40 #include <grpcpp/support/message_allocator.h>
41
42 #include "src/core/lib/iomgr/iomgr.h"
43 #include "src/proto/grpc/testing/echo.grpc.pb.h"
44 #include "test/core/util/port.h"
45 #include "test/core/util/test_config.h"
46 #include "test/cpp/util/test_credentials_provider.h"
47
48 // MAYBE_SKIP_TEST is a macro to determine if this particular test configuration
49 // should be skipped based on a decision made at SetUp time. In particular, any
50 // callback tests can only be run if the iomgr can run in the background or if
51 // the transport is in-process.
52 #define MAYBE_SKIP_TEST \
53   do {                  \
54     if (do_not_test_) { \
55       return;           \
56     }                   \
57   } while (0)
58
59 namespace grpc {
60 namespace testing {
61 namespace {
62
63 class CallbackTestServiceImpl
64     : public EchoTestService::ExperimentalCallbackService {
65  public:
66   explicit CallbackTestServiceImpl() {}
67
68   void SetAllocatorMutator(
69       std::function<void(experimental::RpcAllocatorState* allocator_state,
70                          const EchoRequest* req, EchoResponse* resp)>
71           mutator) {
72     allocator_mutator_ = mutator;
73   }
74
75   experimental::ServerUnaryReactor* Echo(
76       experimental::CallbackServerContext* context, const EchoRequest* request,
77       EchoResponse* response) override {
78     response->set_message(request->message());
79     if (allocator_mutator_) {
80       allocator_mutator_(context->GetRpcAllocatorState(), request, response);
81     }
82     auto* reactor = context->DefaultReactor();
83     reactor->Finish(Status::OK);
84     return reactor;
85   }
86
87  private:
88   std::function<void(experimental::RpcAllocatorState* allocator_state,
89                      const EchoRequest* req, EchoResponse* resp)>
90       allocator_mutator_;
91 };
92
93 enum class Protocol { INPROC, TCP };
94
95 class TestScenario {
96  public:
97   TestScenario(Protocol protocol, const std::string& creds_type)
98       : protocol(protocol), credentials_type(creds_type) {}
99   void Log() const;
100   Protocol protocol;
101   const std::string credentials_type;
102 };
103
104 static std::ostream& operator<<(std::ostream& out,
105                                 const TestScenario& scenario) {
106   return out << "TestScenario{protocol="
107              << (scenario.protocol == Protocol::INPROC ? "INPROC" : "TCP")
108              << "," << scenario.credentials_type << "}";
109 }
110
111 void TestScenario::Log() const {
112   std::ostringstream out;
113   out << *this;
114   gpr_log(GPR_INFO, "%s", out.str().c_str());
115 }
116
117 class MessageAllocatorEnd2endTestBase
118     : public ::testing::TestWithParam<TestScenario> {
119  protected:
120   MessageAllocatorEnd2endTestBase() {
121     GetParam().Log();
122     if (GetParam().protocol == Protocol::TCP) {
123       if (!grpc_iomgr_run_in_background()) {
124         do_not_test_ = true;
125         return;
126       }
127     }
128   }
129
130   ~MessageAllocatorEnd2endTestBase() = default;
131
132   void CreateServer(
133       experimental::MessageAllocator<EchoRequest, EchoResponse>* allocator) {
134     ServerBuilder builder;
135
136     auto server_creds = GetCredentialsProvider()->GetServerCredentials(
137         GetParam().credentials_type);
138     if (GetParam().protocol == Protocol::TCP) {
139       picked_port_ = grpc_pick_unused_port_or_die();
140       server_address_ << "localhost:" << picked_port_;
141       builder.AddListeningPort(server_address_.str(), server_creds);
142     }
143     callback_service_.SetMessageAllocatorFor_Echo(allocator);
144     builder.RegisterService(&callback_service_);
145
146     server_ = builder.BuildAndStart();
147   }
148
149   void DestroyServer() {
150     if (server_) {
151       server_->Shutdown();
152       server_.reset();
153     }
154   }
155
156   void ResetStub() {
157     ChannelArguments args;
158     auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
159         GetParam().credentials_type, &args);
160     switch (GetParam().protocol) {
161       case Protocol::TCP:
162         channel_ = ::grpc::CreateCustomChannel(server_address_.str(),
163                                                channel_creds, args);
164         break;
165       case Protocol::INPROC:
166         channel_ = server_->InProcessChannel(args);
167         break;
168       default:
169         assert(false);
170     }
171     stub_ = EchoTestService::NewStub(channel_);
172   }
173
174   void TearDown() override {
175     DestroyServer();
176     if (picked_port_ > 0) {
177       grpc_recycle_unused_port(picked_port_);
178     }
179   }
180
181   void SendRpcs(int num_rpcs) {
182     std::string test_string("");
183     for (int i = 0; i < num_rpcs; i++) {
184       EchoRequest request;
185       EchoResponse response;
186       ClientContext cli_ctx;
187
188       test_string += std::string(1024, 'x');
189       request.set_message(test_string);
190       std::string val;
191       cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
192
193       std::mutex mu;
194       std::condition_variable cv;
195       bool done = false;
196       stub_->experimental_async()->Echo(
197           &cli_ctx, &request, &response,
198           [&request, &response, &done, &mu, &cv, val](Status s) {
199             GPR_ASSERT(s.ok());
200
201             EXPECT_EQ(request.message(), response.message());
202             std::lock_guard<std::mutex> l(mu);
203             done = true;
204             cv.notify_one();
205           });
206       std::unique_lock<std::mutex> l(mu);
207       while (!done) {
208         cv.wait(l);
209       }
210     }
211   }
212
213   bool do_not_test_{false};
214   int picked_port_{0};
215   std::shared_ptr<Channel> channel_;
216   std::unique_ptr<EchoTestService::Stub> stub_;
217   CallbackTestServiceImpl callback_service_;
218   std::unique_ptr<Server> server_;
219   std::ostringstream server_address_;
220 };
221
222 class NullAllocatorTest : public MessageAllocatorEnd2endTestBase {};
223
224 TEST_P(NullAllocatorTest, SimpleRpc) {
225   MAYBE_SKIP_TEST;
226   CreateServer(nullptr);
227   ResetStub();
228   SendRpcs(1);
229 }
230
231 class SimpleAllocatorTest : public MessageAllocatorEnd2endTestBase {
232  public:
233   class SimpleAllocator
234       : public experimental::MessageAllocator<EchoRequest, EchoResponse> {
235    public:
236     class MessageHolderImpl
237         : public experimental::MessageHolder<EchoRequest, EchoResponse> {
238      public:
239       MessageHolderImpl(std::atomic_int* request_deallocation_count,
240                         std::atomic_int* messages_deallocation_count)
241           : request_deallocation_count_(request_deallocation_count),
242             messages_deallocation_count_(messages_deallocation_count) {
243         set_request(new EchoRequest);
244         set_response(new EchoResponse);
245       }
246       void Release() override {
247         (*messages_deallocation_count_)++;
248         delete request();
249         delete response();
250         delete this;
251       }
252       void FreeRequest() override {
253         (*request_deallocation_count_)++;
254         delete request();
255         set_request(nullptr);
256       }
257
258       EchoRequest* ReleaseRequest() {
259         auto* ret = request();
260         set_request(nullptr);
261         return ret;
262       }
263
264      private:
265       std::atomic_int* const request_deallocation_count_;
266       std::atomic_int* const messages_deallocation_count_;
267     };
268     experimental::MessageHolder<EchoRequest, EchoResponse>* AllocateMessages()
269         override {
270       allocation_count++;
271       return new MessageHolderImpl(&request_deallocation_count,
272                                    &messages_deallocation_count);
273     }
274     int allocation_count = 0;
275     std::atomic_int request_deallocation_count{0};
276     std::atomic_int messages_deallocation_count{0};
277   };
278 };
279
280 TEST_P(SimpleAllocatorTest, SimpleRpc) {
281   MAYBE_SKIP_TEST;
282   const int kRpcCount = 10;
283   std::unique_ptr<SimpleAllocator> allocator(new SimpleAllocator);
284   CreateServer(allocator.get());
285   ResetStub();
286   SendRpcs(kRpcCount);
287   // messages_deallocaton_count is updated in Release after server side OnDone.
288   // Destroy server to make sure it has been updated.
289   DestroyServer();
290   EXPECT_EQ(kRpcCount, allocator->allocation_count);
291   EXPECT_EQ(kRpcCount, allocator->messages_deallocation_count);
292   EXPECT_EQ(0, allocator->request_deallocation_count);
293 }
294
295 TEST_P(SimpleAllocatorTest, RpcWithEarlyFreeRequest) {
296   MAYBE_SKIP_TEST;
297   const int kRpcCount = 10;
298   std::unique_ptr<SimpleAllocator> allocator(new SimpleAllocator);
299   auto mutator = [](experimental::RpcAllocatorState* allocator_state,
300                     const EchoRequest* req, EchoResponse* resp) {
301     auto* info =
302         static_cast<SimpleAllocator::MessageHolderImpl*>(allocator_state);
303     EXPECT_EQ(req, info->request());
304     EXPECT_EQ(resp, info->response());
305     allocator_state->FreeRequest();
306     EXPECT_EQ(nullptr, info->request());
307   };
308   callback_service_.SetAllocatorMutator(mutator);
309   CreateServer(allocator.get());
310   ResetStub();
311   SendRpcs(kRpcCount);
312   // messages_deallocaton_count is updated in Release after server side OnDone.
313   // Destroy server to make sure it has been updated.
314   DestroyServer();
315   EXPECT_EQ(kRpcCount, allocator->allocation_count);
316   EXPECT_EQ(kRpcCount, allocator->messages_deallocation_count);
317   EXPECT_EQ(kRpcCount, allocator->request_deallocation_count);
318 }
319
320 TEST_P(SimpleAllocatorTest, RpcWithReleaseRequest) {
321   MAYBE_SKIP_TEST;
322   const int kRpcCount = 10;
323   std::unique_ptr<SimpleAllocator> allocator(new SimpleAllocator);
324   std::vector<EchoRequest*> released_requests;
325   auto mutator = [&released_requests](
326                      experimental::RpcAllocatorState* allocator_state,
327                      const EchoRequest* req, EchoResponse* resp) {
328     auto* info =
329         static_cast<SimpleAllocator::MessageHolderImpl*>(allocator_state);
330     EXPECT_EQ(req, info->request());
331     EXPECT_EQ(resp, info->response());
332     released_requests.push_back(info->ReleaseRequest());
333     EXPECT_EQ(nullptr, info->request());
334   };
335   callback_service_.SetAllocatorMutator(mutator);
336   CreateServer(allocator.get());
337   ResetStub();
338   SendRpcs(kRpcCount);
339   // messages_deallocaton_count is updated in Release after server side OnDone.
340   // Destroy server to make sure it has been updated.
341   DestroyServer();
342   EXPECT_EQ(kRpcCount, allocator->allocation_count);
343   EXPECT_EQ(kRpcCount, allocator->messages_deallocation_count);
344   EXPECT_EQ(0, allocator->request_deallocation_count);
345   EXPECT_EQ(static_cast<unsigned>(kRpcCount), released_requests.size());
346   for (auto* req : released_requests) {
347     delete req;
348   }
349 }
350
351 class ArenaAllocatorTest : public MessageAllocatorEnd2endTestBase {
352  public:
353   class ArenaAllocator
354       : public experimental::MessageAllocator<EchoRequest, EchoResponse> {
355    public:
356     class MessageHolderImpl
357         : public experimental::MessageHolder<EchoRequest, EchoResponse> {
358      public:
359       MessageHolderImpl() {
360         set_request(
361             google::protobuf::Arena::CreateMessage<EchoRequest>(&arena_));
362         set_response(
363             google::protobuf::Arena::CreateMessage<EchoResponse>(&arena_));
364       }
365       void Release() override { delete this; }
366       void FreeRequest() override { GPR_ASSERT(0); }
367
368      private:
369       google::protobuf::Arena arena_;
370     };
371     experimental::MessageHolder<EchoRequest, EchoResponse>* AllocateMessages()
372         override {
373       allocation_count++;
374       return new MessageHolderImpl;
375     }
376     int allocation_count = 0;
377   };
378 };
379
380 TEST_P(ArenaAllocatorTest, SimpleRpc) {
381   MAYBE_SKIP_TEST;
382   const int kRpcCount = 10;
383   std::unique_ptr<ArenaAllocator> allocator(new ArenaAllocator);
384   CreateServer(allocator.get());
385   ResetStub();
386   SendRpcs(kRpcCount);
387   EXPECT_EQ(kRpcCount, allocator->allocation_count);
388 }
389
390 std::vector<TestScenario> CreateTestScenarios(bool test_insecure) {
391   std::vector<TestScenario> scenarios;
392   std::vector<std::string> credentials_types{
393       GetCredentialsProvider()->GetSecureCredentialsTypeList()};
394   auto insec_ok = [] {
395     // Only allow insecure credentials type when it is registered with the
396     // provider. User may create providers that do not have insecure.
397     return GetCredentialsProvider()->GetChannelCredentials(
398                kInsecureCredentialsType, nullptr) != nullptr;
399   };
400   if (test_insecure && insec_ok()) {
401     credentials_types.push_back(kInsecureCredentialsType);
402   }
403   GPR_ASSERT(!credentials_types.empty());
404
405   Protocol parr[]{Protocol::INPROC, Protocol::TCP};
406   for (Protocol p : parr) {
407     for (const auto& cred : credentials_types) {
408       // TODO(vjpai): Test inproc with secure credentials when feasible
409       if (p == Protocol::INPROC &&
410           (cred != kInsecureCredentialsType || !insec_ok())) {
411         continue;
412       }
413       scenarios.emplace_back(p, cred);
414     }
415   }
416   return scenarios;
417 }
418
419 INSTANTIATE_TEST_SUITE_P(NullAllocatorTest, NullAllocatorTest,
420                          ::testing::ValuesIn(CreateTestScenarios(true)));
421 INSTANTIATE_TEST_SUITE_P(SimpleAllocatorTest, SimpleAllocatorTest,
422                          ::testing::ValuesIn(CreateTestScenarios(true)));
423 INSTANTIATE_TEST_SUITE_P(ArenaAllocatorTest, ArenaAllocatorTest,
424                          ::testing::ValuesIn(CreateTestScenarios(true)));
425
426 }  // namespace
427 }  // namespace testing
428 }  // namespace grpc
429
430 int main(int argc, char** argv) {
431   grpc::testing::TestEnvironment env(argc, argv);
432   // The grpc_init is to cover the MAYBE_SKIP_TEST.
433   grpc_init();
434   ::testing::InitGoogleTest(&argc, argv);
435   int ret = RUN_ALL_TESTS();
436   grpc_shutdown();
437   return ret;
438 }