a2e202da1166760d8d1b0e1618ec1b8f4799309a
[platform/upstream/grpc.git] / test / cpp / end2end / context_allocator_end2end_test.cc
1 /*
2  *
3  * Copyright 2020 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include <grpc/impl/codegen/log.h>
20 #include <grpcpp/channel.h>
21 #include <grpcpp/client_context.h>
22 #include <grpcpp/create_channel.h>
23 #include <grpcpp/server.h>
24 #include <grpcpp/server_builder.h>
25 #include <grpcpp/server_context.h>
26 #include <grpcpp/support/client_callback.h>
27 #include <grpcpp/support/message_allocator.h>
28 #include <gtest/gtest.h>
29
30 #include <algorithm>
31 #include <atomic>
32 #include <condition_variable>
33 #include <functional>
34 #include <memory>
35 #include <mutex>
36 #include <sstream>
37 #include <thread>
38
39 #include "src/core/lib/iomgr/iomgr.h"
40 #include "src/proto/grpc/testing/echo.grpc.pb.h"
41 #include "test/core/util/port.h"
42 #include "test/core/util/test_config.h"
43 #include "test/cpp/end2end/test_service_impl.h"
44 #include "test/cpp/util/test_credentials_provider.h"
45
46 namespace grpc {
47 namespace testing {
48 namespace {
49
50 enum class Protocol { INPROC, TCP };
51
52 class TestScenario {
53  public:
54   TestScenario(Protocol protocol, const std::string& creds_type)
55       : protocol(protocol), credentials_type(creds_type) {}
56   void Log() const;
57   Protocol protocol;
58   const std::string credentials_type;
59 };
60
61 static std::ostream& operator<<(std::ostream& out,
62                                 const TestScenario& scenario) {
63   return out << "TestScenario{protocol="
64              << (scenario.protocol == Protocol::INPROC ? "INPROC" : "TCP")
65              << "," << scenario.credentials_type << "}";
66 }
67
68 void TestScenario::Log() const {
69   std::ostringstream out;
70   out << *this;
71   gpr_log(GPR_INFO, "%s", out.str().c_str());
72 }
73
74 class ContextAllocatorEnd2endTestBase
75     : public ::testing::TestWithParam<TestScenario> {
76  protected:
77   static void SetUpTestCase() { grpc_init(); }
78   static void TearDownTestCase() { grpc_shutdown(); }
79   ContextAllocatorEnd2endTestBase() {}
80
81   ~ContextAllocatorEnd2endTestBase() override = default;
82
83   void SetUp() override { GetParam().Log(); }
84
85   void CreateServer(std::unique_ptr<grpc::ContextAllocator> context_allocator) {
86     ServerBuilder builder;
87
88     auto server_creds = GetCredentialsProvider()->GetServerCredentials(
89         GetParam().credentials_type);
90     if (GetParam().protocol == Protocol::TCP) {
91       picked_port_ = grpc_pick_unused_port_or_die();
92       server_address_ << "localhost:" << picked_port_;
93       builder.AddListeningPort(server_address_.str(), server_creds);
94     }
95     builder.SetContextAllocator(std::move(context_allocator));
96     builder.RegisterService(&callback_service_);
97
98     server_ = builder.BuildAndStart();
99   }
100
101   void DestroyServer() {
102     if (server_) {
103       server_->Shutdown();
104       server_.reset();
105     }
106   }
107
108   void ResetStub() {
109     ChannelArguments args;
110     auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
111         GetParam().credentials_type, &args);
112     switch (GetParam().protocol) {
113       case Protocol::TCP:
114         channel_ = ::grpc::CreateCustomChannel(server_address_.str(),
115                                                channel_creds, args);
116         break;
117       case Protocol::INPROC:
118         channel_ = server_->InProcessChannel(args);
119         break;
120       default:
121         assert(false);
122     }
123     stub_ = EchoTestService::NewStub(channel_);
124   }
125
126   void TearDown() override {
127     DestroyServer();
128     if (picked_port_ > 0) {
129       grpc_recycle_unused_port(picked_port_);
130     }
131   }
132
133   void SendRpcs(int num_rpcs) {
134     std::string test_string("");
135     for (int i = 0; i < num_rpcs; i++) {
136       EchoRequest request;
137       EchoResponse response;
138       ClientContext cli_ctx;
139
140       test_string += std::string(1024, 'x');
141       request.set_message(test_string);
142       std::string val;
143       cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
144
145       std::mutex mu;
146       std::condition_variable cv;
147       bool done = false;
148       stub_->async()->Echo(
149           &cli_ctx, &request, &response,
150           [&request, &response, &done, &mu, &cv, val](Status s) {
151             GPR_ASSERT(s.ok());
152
153             EXPECT_EQ(request.message(), response.message());
154             std::lock_guard<std::mutex> l(mu);
155             done = true;
156             cv.notify_one();
157           });
158       std::unique_lock<std::mutex> l(mu);
159       while (!done) {
160         cv.wait(l);
161       }
162     }
163   }
164
165   int picked_port_{0};
166   std::shared_ptr<Channel> channel_;
167   std::unique_ptr<EchoTestService::Stub> stub_;
168   CallbackTestServiceImpl callback_service_;
169   std::unique_ptr<Server> server_;
170   std::ostringstream server_address_;
171 };
172
173 class DefaultContextAllocatorTest : public ContextAllocatorEnd2endTestBase {};
174
175 TEST_P(DefaultContextAllocatorTest, SimpleRpc) {
176   const int kRpcCount = 10;
177   CreateServer(nullptr);
178   ResetStub();
179   SendRpcs(kRpcCount);
180 }
181
182 class NullContextAllocatorTest : public ContextAllocatorEnd2endTestBase {
183  public:
184   class NullAllocator : public grpc::ContextAllocator {
185    public:
186     NullAllocator(std::atomic<int>* allocation_count,
187                   std::atomic<int>* deallocation_count)
188         : allocation_count_(allocation_count),
189           deallocation_count_(deallocation_count) {}
190     grpc::CallbackServerContext* NewCallbackServerContext() override {
191       allocation_count_->fetch_add(1, std::memory_order_relaxed);
192       return nullptr;
193     }
194
195     GenericCallbackServerContext* NewGenericCallbackServerContext() override {
196       allocation_count_->fetch_add(1, std::memory_order_relaxed);
197       return nullptr;
198     }
199
200     void Release(
201         grpc::CallbackServerContext* /*callback_server_context*/) override {
202       deallocation_count_->fetch_add(1, std::memory_order_relaxed);
203     }
204
205     void Release(
206         GenericCallbackServerContext* /*generic_callback_server_context*/)
207         override {
208       deallocation_count_->fetch_add(1, std::memory_order_relaxed);
209     }
210
211     std::atomic<int>* allocation_count_;
212     std::atomic<int>* deallocation_count_;
213   };
214 };
215
216 TEST_P(NullContextAllocatorTest, UnaryRpc) {
217   const int kRpcCount = 10;
218   std::atomic<int> allocation_count{0};
219   std::atomic<int> deallocation_count{0};
220   std::unique_ptr<NullAllocator> allocator(
221       new NullAllocator(&allocation_count, &deallocation_count));
222   CreateServer(std::move(allocator));
223   ResetStub();
224   SendRpcs(kRpcCount);
225   // messages_deallocaton_count is updated in Release after server side
226   // OnDone.
227   DestroyServer();
228   EXPECT_EQ(kRpcCount, allocation_count);
229   EXPECT_EQ(kRpcCount, deallocation_count);
230 }
231
232 class SimpleContextAllocatorTest : public ContextAllocatorEnd2endTestBase {
233  public:
234   class SimpleAllocator : public grpc::ContextAllocator {
235    public:
236     SimpleAllocator(std::atomic<int>* allocation_count,
237                     std::atomic<int>* deallocation_count)
238         : allocation_count_(allocation_count),
239           deallocation_count_(deallocation_count) {}
240     grpc::CallbackServerContext* NewCallbackServerContext() override {
241       allocation_count_->fetch_add(1, std::memory_order_relaxed);
242       return new grpc::CallbackServerContext();
243     }
244     GenericCallbackServerContext* NewGenericCallbackServerContext() override {
245       allocation_count_->fetch_add(1, std::memory_order_relaxed);
246       return new GenericCallbackServerContext();
247     }
248
249     void Release(
250         grpc::CallbackServerContext* callback_server_context) override {
251       deallocation_count_->fetch_add(1, std::memory_order_relaxed);
252       delete callback_server_context;
253     }
254
255     void Release(GenericCallbackServerContext* generic_callback_server_context)
256         override {
257       deallocation_count_->fetch_add(1, std::memory_order_relaxed);
258       delete generic_callback_server_context;
259     }
260
261     std::atomic<int>* allocation_count_;
262     std::atomic<int>* deallocation_count_;
263   };
264 };
265
266 TEST_P(SimpleContextAllocatorTest, UnaryRpc) {
267   const int kRpcCount = 10;
268   std::atomic<int> allocation_count{0};
269   std::atomic<int> deallocation_count{0};
270   std::unique_ptr<SimpleAllocator> allocator(
271       new SimpleAllocator(&allocation_count, &deallocation_count));
272   CreateServer(std::move(allocator));
273   ResetStub();
274   SendRpcs(kRpcCount);
275   // messages_deallocaton_count is updated in Release after server side
276   // OnDone.
277   DestroyServer();
278   EXPECT_EQ(kRpcCount, allocation_count);
279   EXPECT_EQ(kRpcCount, deallocation_count);
280 }
281
282 std::vector<TestScenario> CreateTestScenarios(bool test_insecure) {
283   std::vector<TestScenario> scenarios;
284   std::vector<std::string> credentials_types{
285       GetCredentialsProvider()->GetSecureCredentialsTypeList()};
286   auto insec_ok = [] {
287     // Only allow insecure credentials type when it is registered with the
288     // provider. User may create providers that do not have insecure.
289     return GetCredentialsProvider()->GetChannelCredentials(
290                kInsecureCredentialsType, nullptr) != nullptr;
291   };
292   if (test_insecure && insec_ok()) {
293     credentials_types.push_back(kInsecureCredentialsType);
294   }
295   GPR_ASSERT(!credentials_types.empty());
296
297   Protocol parr[]{Protocol::INPROC, Protocol::TCP};
298   for (Protocol p : parr) {
299     for (const auto& cred : credentials_types) {
300       if (p == Protocol::INPROC &&
301           (cred != kInsecureCredentialsType || !insec_ok())) {
302         continue;
303       }
304       scenarios.emplace_back(p, cred);
305     }
306   }
307   return scenarios;
308 }
309
310 // TODO(ddyihai): adding client streaming/server streaming/bidi streaming
311 // test.
312
313 INSTANTIATE_TEST_SUITE_P(DefaultContextAllocatorTest,
314                          DefaultContextAllocatorTest,
315                          ::testing::ValuesIn(CreateTestScenarios(true)));
316 INSTANTIATE_TEST_SUITE_P(NullContextAllocatorTest, NullContextAllocatorTest,
317                          ::testing::ValuesIn(CreateTestScenarios(true)));
318 INSTANTIATE_TEST_SUITE_P(SimpleContextAllocatorTest, SimpleContextAllocatorTest,
319                          ::testing::ValuesIn(CreateTestScenarios(true)));
320
321 }  // namespace
322 }  // namespace testing
323 }  // namespace grpc
324
325 int main(int argc, char** argv) {
326   grpc::testing::TestEnvironment env(argc, argv);
327   ::testing::InitGoogleTest(&argc, argv);
328   int ret = RUN_ALL_TESTS();
329   return ret;
330 }