Imported Upstream version 1.27.0
[platform/upstream/grpc.git] / test / core / transport / byte_stream_test.cc
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include "src/core/lib/transport/byte_stream.h"
20
21 #include <grpc/grpc.h>
22 #include <grpc/support/alloc.h>
23 #include <grpc/support/log.h>
24
25 #include "src/core/lib/gpr/useful.h"
26 #include "src/core/lib/iomgr/exec_ctx.h"
27 #include "src/core/lib/slice/slice_internal.h"
28
29 #include "test/core/util/test_config.h"
30
31 #include <gtest/gtest.h>
32
33 namespace grpc_core {
34 namespace {
35
36 //
37 // SliceBufferByteStream tests
38 //
39
40 void NotCalledClosure(void* /*arg*/, grpc_error* /*error*/) {
41   GPR_ASSERT(false);
42 }
43
44 TEST(SliceBufferByteStream, Basic) {
45   grpc_core::ExecCtx exec_ctx;
46   // Create and populate slice buffer.
47   grpc_slice_buffer buffer;
48   grpc_slice_buffer_init(&buffer);
49   grpc_slice input[] = {
50       grpc_slice_from_static_string("foo"),
51       grpc_slice_from_static_string("bar"),
52   };
53   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
54     grpc_slice_buffer_add(&buffer, input[i]);
55   }
56   // Create byte stream.
57   SliceBufferByteStream stream(&buffer, 0);
58   grpc_slice_buffer_destroy_internal(&buffer);
59   EXPECT_EQ(6U, stream.length());
60   grpc_closure closure;
61   GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
62                     grpc_schedule_on_exec_ctx);
63   // Read each slice.  Note that Next() always returns synchronously.
64   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
65     ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
66     grpc_slice output;
67     grpc_error* error = stream.Pull(&output);
68     EXPECT_TRUE(error == GRPC_ERROR_NONE);
69     EXPECT_TRUE(grpc_slice_eq(input[i], output));
70     grpc_slice_unref_internal(output);
71   }
72   // Clean up.
73   stream.Orphan();
74 }
75
76 TEST(SliceBufferByteStream, Shutdown) {
77   grpc_core::ExecCtx exec_ctx;
78   // Create and populate slice buffer.
79   grpc_slice_buffer buffer;
80   grpc_slice_buffer_init(&buffer);
81   grpc_slice input[] = {
82       grpc_slice_from_static_string("foo"),
83       grpc_slice_from_static_string("bar"),
84   };
85   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
86     grpc_slice_buffer_add(&buffer, input[i]);
87   }
88   // Create byte stream.
89   SliceBufferByteStream stream(&buffer, 0);
90   grpc_slice_buffer_destroy_internal(&buffer);
91   EXPECT_EQ(6U, stream.length());
92   grpc_closure closure;
93   GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
94                     grpc_schedule_on_exec_ctx);
95   // Read the first slice.
96   ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
97   grpc_slice output;
98   grpc_error* error = stream.Pull(&output);
99   EXPECT_TRUE(error == GRPC_ERROR_NONE);
100   EXPECT_TRUE(grpc_slice_eq(input[0], output));
101   grpc_slice_unref_internal(output);
102   // Now shutdown.
103   grpc_error* shutdown_error =
104       GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown error");
105   stream.Shutdown(GRPC_ERROR_REF(shutdown_error));
106   // After shutdown, the next pull() should return the error.
107   ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
108   error = stream.Pull(&output);
109   EXPECT_TRUE(error == shutdown_error);
110   GRPC_ERROR_UNREF(error);
111   GRPC_ERROR_UNREF(shutdown_error);
112   // Clean up.
113   stream.Orphan();
114 }
115
116 //
117 // CachingByteStream tests
118 //
119
120 TEST(CachingByteStream, Basic) {
121   grpc_core::ExecCtx exec_ctx;
122   // Create and populate slice buffer byte stream.
123   grpc_slice_buffer buffer;
124   grpc_slice_buffer_init(&buffer);
125   grpc_slice input[] = {
126       grpc_slice_from_static_string("foo"),
127       grpc_slice_from_static_string("bar"),
128   };
129   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
130     grpc_slice_buffer_add(&buffer, input[i]);
131   }
132   SliceBufferByteStream underlying_stream(&buffer, 0);
133   grpc_slice_buffer_destroy_internal(&buffer);
134   // Create cache and caching stream.
135   ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
136   ByteStreamCache::CachingByteStream stream(&cache);
137   grpc_closure closure;
138   GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
139                     grpc_schedule_on_exec_ctx);
140   // Read each slice.  Note that next() always returns synchronously,
141   // because the underlying byte stream always does.
142   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
143     ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
144     grpc_slice output;
145     grpc_error* error = stream.Pull(&output);
146     EXPECT_TRUE(error == GRPC_ERROR_NONE);
147     EXPECT_TRUE(grpc_slice_eq(input[i], output));
148     grpc_slice_unref_internal(output);
149   }
150   // Clean up.
151   stream.Orphan();
152   cache.Destroy();
153 }
154
155 TEST(CachingByteStream, Reset) {
156   grpc_core::ExecCtx exec_ctx;
157   // Create and populate slice buffer byte stream.
158   grpc_slice_buffer buffer;
159   grpc_slice_buffer_init(&buffer);
160   grpc_slice input[] = {
161       grpc_slice_from_static_string("foo"),
162       grpc_slice_from_static_string("bar"),
163   };
164   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
165     grpc_slice_buffer_add(&buffer, input[i]);
166   }
167   SliceBufferByteStream underlying_stream(&buffer, 0);
168   grpc_slice_buffer_destroy_internal(&buffer);
169   // Create cache and caching stream.
170   ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
171   ByteStreamCache::CachingByteStream stream(&cache);
172   grpc_closure closure;
173   GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
174                     grpc_schedule_on_exec_ctx);
175   // Read one slice.
176   ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
177   grpc_slice output;
178   grpc_error* error = stream.Pull(&output);
179   EXPECT_TRUE(error == GRPC_ERROR_NONE);
180   EXPECT_TRUE(grpc_slice_eq(input[0], output));
181   grpc_slice_unref_internal(output);
182   // Reset the caching stream.  The reads should start over from the
183   // first slice.
184   stream.Reset();
185   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
186     ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
187     error = stream.Pull(&output);
188     EXPECT_TRUE(error == GRPC_ERROR_NONE);
189     EXPECT_TRUE(grpc_slice_eq(input[i], output));
190     grpc_slice_unref_internal(output);
191   }
192   // Clean up.
193   stream.Orphan();
194   cache.Destroy();
195 }
196
197 TEST(CachingByteStream, SharedCache) {
198   grpc_core::ExecCtx exec_ctx;
199   // Create and populate slice buffer byte stream.
200   grpc_slice_buffer buffer;
201   grpc_slice_buffer_init(&buffer);
202   grpc_slice input[] = {
203       grpc_slice_from_static_string("foo"),
204       grpc_slice_from_static_string("bar"),
205   };
206   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
207     grpc_slice_buffer_add(&buffer, input[i]);
208   }
209   SliceBufferByteStream underlying_stream(&buffer, 0);
210   grpc_slice_buffer_destroy_internal(&buffer);
211   // Create cache and two caching streams.
212   ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
213   ByteStreamCache::CachingByteStream stream1(&cache);
214   ByteStreamCache::CachingByteStream stream2(&cache);
215   grpc_closure closure;
216   GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
217                     grpc_schedule_on_exec_ctx);
218   // Read one slice from stream1.
219   EXPECT_TRUE(stream1.Next(~(size_t)0, &closure));
220   grpc_slice output;
221   grpc_error* error = stream1.Pull(&output);
222   EXPECT_TRUE(error == GRPC_ERROR_NONE);
223   EXPECT_TRUE(grpc_slice_eq(input[0], output));
224   grpc_slice_unref_internal(output);
225   // Read all slices from stream2.
226   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
227     EXPECT_TRUE(stream2.Next(~(size_t)0, &closure));
228     error = stream2.Pull(&output);
229     EXPECT_TRUE(error == GRPC_ERROR_NONE);
230     EXPECT_TRUE(grpc_slice_eq(input[i], output));
231     grpc_slice_unref_internal(output);
232   }
233   // Now read the second slice from stream1.
234   EXPECT_TRUE(stream1.Next(~(size_t)0, &closure));
235   error = stream1.Pull(&output);
236   EXPECT_TRUE(error == GRPC_ERROR_NONE);
237   EXPECT_TRUE(grpc_slice_eq(input[1], output));
238   grpc_slice_unref_internal(output);
239   // Clean up.
240   stream1.Orphan();
241   stream2.Orphan();
242   cache.Destroy();
243 }
244
245 }  // namespace
246 }  // namespace grpc_core
247
248 int main(int argc, char** argv) {
249   ::testing::InitGoogleTest(&argc, argv);
250   grpc::testing::TestEnvironment env(argc, argv);
251   grpc_init();
252   int retval = RUN_ALL_TESTS();
253   grpc_shutdown();
254   return retval;
255 }