014bb1d0cad433aeb6f779a166f164f7dabcf503
[platform/upstream/grpc.git] / test / core / transport / byte_stream_test.cc
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #include "src/core/lib/transport/byte_stream.h"
20
21 #include <gtest/gtest.h>
22
23 #include <grpc/grpc.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
26
27 #include "src/core/lib/gpr/useful.h"
28 #include "src/core/lib/iomgr/exec_ctx.h"
29 #include "src/core/lib/slice/slice_internal.h"
30 #include "test/core/util/test_config.h"
31
32 namespace grpc_core {
33 namespace {
34
35 //
36 // SliceBufferByteStream tests
37 //
38
39 void NotCalledClosure(void* /*arg*/, grpc_error_handle /*error*/) {
40   GPR_ASSERT(false);
41 }
42
43 TEST(SliceBufferByteStream, Basic) {
44   grpc_core::ExecCtx exec_ctx;
45   // Create and populate slice buffer.
46   grpc_slice_buffer buffer;
47   grpc_slice_buffer_init(&buffer);
48   grpc_slice input[] = {
49       grpc_slice_from_static_string("foo"),
50       grpc_slice_from_static_string("bar"),
51   };
52   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
53     grpc_slice_buffer_add(&buffer, input[i]);
54   }
55   // Create byte stream.
56   SliceBufferByteStream stream(&buffer, 0);
57   grpc_slice_buffer_destroy_internal(&buffer);
58   EXPECT_EQ(6U, stream.length());
59   grpc_closure closure;
60   GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
61                     grpc_schedule_on_exec_ctx);
62   // Read each slice.  Note that Next() always returns synchronously.
63   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
64     ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
65     grpc_slice output;
66     grpc_error_handle error = stream.Pull(&output);
67     EXPECT_TRUE(error == GRPC_ERROR_NONE);
68     EXPECT_TRUE(grpc_slice_eq(input[i], output));
69     grpc_slice_unref_internal(output);
70   }
71   // Clean up.
72   stream.Orphan();
73 }
74
75 TEST(SliceBufferByteStream, Shutdown) {
76   grpc_core::ExecCtx exec_ctx;
77   // Create and populate slice buffer.
78   grpc_slice_buffer buffer;
79   grpc_slice_buffer_init(&buffer);
80   grpc_slice input[] = {
81       grpc_slice_from_static_string("foo"),
82       grpc_slice_from_static_string("bar"),
83   };
84   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
85     grpc_slice_buffer_add(&buffer, input[i]);
86   }
87   // Create byte stream.
88   SliceBufferByteStream stream(&buffer, 0);
89   grpc_slice_buffer_destroy_internal(&buffer);
90   EXPECT_EQ(6U, stream.length());
91   grpc_closure closure;
92   GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
93                     grpc_schedule_on_exec_ctx);
94   // Read the first slice.
95   ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
96   grpc_slice output;
97   grpc_error_handle error = stream.Pull(&output);
98   EXPECT_TRUE(error == GRPC_ERROR_NONE);
99   EXPECT_TRUE(grpc_slice_eq(input[0], output));
100   grpc_slice_unref_internal(output);
101   // Now shutdown.
102   grpc_error_handle shutdown_error =
103       GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown error");
104   stream.Shutdown(GRPC_ERROR_REF(shutdown_error));
105   // After shutdown, the next pull() should return the error.
106   ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
107   error = stream.Pull(&output);
108   EXPECT_TRUE(error == shutdown_error);
109   GRPC_ERROR_UNREF(error);
110   GRPC_ERROR_UNREF(shutdown_error);
111   // Clean up.
112   stream.Orphan();
113 }
114
115 //
116 // CachingByteStream tests
117 //
118
119 TEST(CachingByteStream, Basic) {
120   grpc_core::ExecCtx exec_ctx;
121   // Create and populate slice buffer byte stream.
122   grpc_slice_buffer buffer;
123   grpc_slice_buffer_init(&buffer);
124   grpc_slice input[] = {
125       grpc_slice_from_static_string("foo"),
126       grpc_slice_from_static_string("bar"),
127   };
128   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
129     grpc_slice_buffer_add(&buffer, input[i]);
130   }
131   SliceBufferByteStream underlying_stream(&buffer, 0);
132   grpc_slice_buffer_destroy_internal(&buffer);
133   // Create cache and caching stream.
134   ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
135   ByteStreamCache::CachingByteStream stream(&cache);
136   grpc_closure closure;
137   GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
138                     grpc_schedule_on_exec_ctx);
139   // Read each slice.  Note that next() always returns synchronously,
140   // because the underlying byte stream always does.
141   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
142     ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
143     grpc_slice output;
144     grpc_error_handle error = stream.Pull(&output);
145     EXPECT_TRUE(error == GRPC_ERROR_NONE);
146     EXPECT_TRUE(grpc_slice_eq(input[i], output));
147     grpc_slice_unref_internal(output);
148   }
149   // Clean up.
150   stream.Orphan();
151   cache.Destroy();
152 }
153
154 TEST(CachingByteStream, Reset) {
155   grpc_core::ExecCtx exec_ctx;
156   // Create and populate slice buffer byte stream.
157   grpc_slice_buffer buffer;
158   grpc_slice_buffer_init(&buffer);
159   grpc_slice input[] = {
160       grpc_slice_from_static_string("foo"),
161       grpc_slice_from_static_string("bar"),
162   };
163   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
164     grpc_slice_buffer_add(&buffer, input[i]);
165   }
166   SliceBufferByteStream underlying_stream(&buffer, 0);
167   grpc_slice_buffer_destroy_internal(&buffer);
168   // Create cache and caching stream.
169   ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
170   ByteStreamCache::CachingByteStream stream(&cache);
171   grpc_closure closure;
172   GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
173                     grpc_schedule_on_exec_ctx);
174   // Read one slice.
175   ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
176   grpc_slice output;
177   grpc_error_handle error = stream.Pull(&output);
178   EXPECT_TRUE(error == GRPC_ERROR_NONE);
179   EXPECT_TRUE(grpc_slice_eq(input[0], output));
180   grpc_slice_unref_internal(output);
181   // Reset the caching stream.  The reads should start over from the
182   // first slice.
183   stream.Reset();
184   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
185     ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
186     error = stream.Pull(&output);
187     EXPECT_TRUE(error == GRPC_ERROR_NONE);
188     EXPECT_TRUE(grpc_slice_eq(input[i], output));
189     grpc_slice_unref_internal(output);
190   }
191   // Clean up.
192   stream.Orphan();
193   cache.Destroy();
194 }
195
196 TEST(CachingByteStream, SharedCache) {
197   grpc_core::ExecCtx exec_ctx;
198   // Create and populate slice buffer byte stream.
199   grpc_slice_buffer buffer;
200   grpc_slice_buffer_init(&buffer);
201   grpc_slice input[] = {
202       grpc_slice_from_static_string("foo"),
203       grpc_slice_from_static_string("bar"),
204   };
205   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
206     grpc_slice_buffer_add(&buffer, input[i]);
207   }
208   SliceBufferByteStream underlying_stream(&buffer, 0);
209   grpc_slice_buffer_destroy_internal(&buffer);
210   // Create cache and two caching streams.
211   ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
212   ByteStreamCache::CachingByteStream stream1(&cache);
213   ByteStreamCache::CachingByteStream stream2(&cache);
214   grpc_closure closure;
215   GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
216                     grpc_schedule_on_exec_ctx);
217   // Read one slice from stream1.
218   EXPECT_TRUE(stream1.Next(~(size_t)0, &closure));
219   grpc_slice output;
220   grpc_error_handle error = stream1.Pull(&output);
221   EXPECT_TRUE(error == GRPC_ERROR_NONE);
222   EXPECT_TRUE(grpc_slice_eq(input[0], output));
223   grpc_slice_unref_internal(output);
224   // Read all slices from stream2.
225   for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
226     EXPECT_TRUE(stream2.Next(~(size_t)0, &closure));
227     error = stream2.Pull(&output);
228     EXPECT_TRUE(error == GRPC_ERROR_NONE);
229     EXPECT_TRUE(grpc_slice_eq(input[i], output));
230     grpc_slice_unref_internal(output);
231   }
232   // Now read the second slice from stream1.
233   EXPECT_TRUE(stream1.Next(~(size_t)0, &closure));
234   error = stream1.Pull(&output);
235   EXPECT_TRUE(error == GRPC_ERROR_NONE);
236   EXPECT_TRUE(grpc_slice_eq(input[1], output));
237   grpc_slice_unref_internal(output);
238   // Clean up.
239   stream1.Orphan();
240   stream2.Orphan();
241   cache.Destroy();
242 }
243
244 }  // namespace
245 }  // namespace grpc_core
246
247 int main(int argc, char** argv) {
248   grpc::testing::TestEnvironment env(argc, argv);
249   ::testing::InitGoogleTest(&argc, argv);
250   grpc_init();
251   int retval = RUN_ALL_TESTS();
252   grpc_shutdown();
253   return retval;
254 }