3 * Copyright 2017 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include "src/core/lib/transport/byte_stream.h"
21 #include <gtest/gtest.h>
23 #include <grpc/grpc.h>
24 #include <grpc/support/alloc.h>
25 #include <grpc/support/log.h>
27 #include "src/core/lib/gpr/useful.h"
28 #include "src/core/lib/iomgr/exec_ctx.h"
29 #include "src/core/lib/slice/slice_internal.h"
30 #include "test/core/util/test_config.h"
36 // SliceBufferByteStream tests
39 void NotCalledClosure(void* /*arg*/, grpc_error_handle /*error*/) {
43 TEST(SliceBufferByteStream, Basic) {
44 grpc_core::ExecCtx exec_ctx;
45 // Create and populate slice buffer.
46 grpc_slice_buffer buffer;
47 grpc_slice_buffer_init(&buffer);
48 grpc_slice input[] = {
49 grpc_slice_from_static_string("foo"),
50 grpc_slice_from_static_string("bar"),
52 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
53 grpc_slice_buffer_add(&buffer, input[i]);
55 // Create byte stream.
56 SliceBufferByteStream stream(&buffer, 0);
57 grpc_slice_buffer_destroy_internal(&buffer);
58 EXPECT_EQ(6U, stream.length());
60 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
61 grpc_schedule_on_exec_ctx);
62 // Read each slice. Note that Next() always returns synchronously.
63 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
64 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
66 grpc_error_handle error = stream.Pull(&output);
67 EXPECT_TRUE(error == GRPC_ERROR_NONE);
68 EXPECT_TRUE(grpc_slice_eq(input[i], output));
69 grpc_slice_unref_internal(output);
75 TEST(SliceBufferByteStream, Shutdown) {
76 grpc_core::ExecCtx exec_ctx;
77 // Create and populate slice buffer.
78 grpc_slice_buffer buffer;
79 grpc_slice_buffer_init(&buffer);
80 grpc_slice input[] = {
81 grpc_slice_from_static_string("foo"),
82 grpc_slice_from_static_string("bar"),
84 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
85 grpc_slice_buffer_add(&buffer, input[i]);
87 // Create byte stream.
88 SliceBufferByteStream stream(&buffer, 0);
89 grpc_slice_buffer_destroy_internal(&buffer);
90 EXPECT_EQ(6U, stream.length());
92 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
93 grpc_schedule_on_exec_ctx);
94 // Read the first slice.
95 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
97 grpc_error_handle error = stream.Pull(&output);
98 EXPECT_TRUE(error == GRPC_ERROR_NONE);
99 EXPECT_TRUE(grpc_slice_eq(input[0], output));
100 grpc_slice_unref_internal(output);
102 grpc_error_handle shutdown_error =
103 GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown error");
104 stream.Shutdown(GRPC_ERROR_REF(shutdown_error));
105 // After shutdown, the next pull() should return the error.
106 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
107 error = stream.Pull(&output);
108 EXPECT_TRUE(error == shutdown_error);
109 GRPC_ERROR_UNREF(error);
110 GRPC_ERROR_UNREF(shutdown_error);
116 // CachingByteStream tests
119 TEST(CachingByteStream, Basic) {
120 grpc_core::ExecCtx exec_ctx;
121 // Create and populate slice buffer byte stream.
122 grpc_slice_buffer buffer;
123 grpc_slice_buffer_init(&buffer);
124 grpc_slice input[] = {
125 grpc_slice_from_static_string("foo"),
126 grpc_slice_from_static_string("bar"),
128 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
129 grpc_slice_buffer_add(&buffer, input[i]);
131 SliceBufferByteStream underlying_stream(&buffer, 0);
132 grpc_slice_buffer_destroy_internal(&buffer);
133 // Create cache and caching stream.
134 ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
135 ByteStreamCache::CachingByteStream stream(&cache);
136 grpc_closure closure;
137 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
138 grpc_schedule_on_exec_ctx);
139 // Read each slice. Note that next() always returns synchronously,
140 // because the underlying byte stream always does.
141 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
142 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
144 grpc_error_handle error = stream.Pull(&output);
145 EXPECT_TRUE(error == GRPC_ERROR_NONE);
146 EXPECT_TRUE(grpc_slice_eq(input[i], output));
147 grpc_slice_unref_internal(output);
154 TEST(CachingByteStream, Reset) {
155 grpc_core::ExecCtx exec_ctx;
156 // Create and populate slice buffer byte stream.
157 grpc_slice_buffer buffer;
158 grpc_slice_buffer_init(&buffer);
159 grpc_slice input[] = {
160 grpc_slice_from_static_string("foo"),
161 grpc_slice_from_static_string("bar"),
163 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
164 grpc_slice_buffer_add(&buffer, input[i]);
166 SliceBufferByteStream underlying_stream(&buffer, 0);
167 grpc_slice_buffer_destroy_internal(&buffer);
168 // Create cache and caching stream.
169 ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
170 ByteStreamCache::CachingByteStream stream(&cache);
171 grpc_closure closure;
172 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
173 grpc_schedule_on_exec_ctx);
175 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
177 grpc_error_handle error = stream.Pull(&output);
178 EXPECT_TRUE(error == GRPC_ERROR_NONE);
179 EXPECT_TRUE(grpc_slice_eq(input[0], output));
180 grpc_slice_unref_internal(output);
181 // Reset the caching stream. The reads should start over from the
184 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
185 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
186 error = stream.Pull(&output);
187 EXPECT_TRUE(error == GRPC_ERROR_NONE);
188 EXPECT_TRUE(grpc_slice_eq(input[i], output));
189 grpc_slice_unref_internal(output);
196 TEST(CachingByteStream, SharedCache) {
197 grpc_core::ExecCtx exec_ctx;
198 // Create and populate slice buffer byte stream.
199 grpc_slice_buffer buffer;
200 grpc_slice_buffer_init(&buffer);
201 grpc_slice input[] = {
202 grpc_slice_from_static_string("foo"),
203 grpc_slice_from_static_string("bar"),
205 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
206 grpc_slice_buffer_add(&buffer, input[i]);
208 SliceBufferByteStream underlying_stream(&buffer, 0);
209 grpc_slice_buffer_destroy_internal(&buffer);
210 // Create cache and two caching streams.
211 ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
212 ByteStreamCache::CachingByteStream stream1(&cache);
213 ByteStreamCache::CachingByteStream stream2(&cache);
214 grpc_closure closure;
215 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
216 grpc_schedule_on_exec_ctx);
217 // Read one slice from stream1.
218 EXPECT_TRUE(stream1.Next(~(size_t)0, &closure));
220 grpc_error_handle error = stream1.Pull(&output);
221 EXPECT_TRUE(error == GRPC_ERROR_NONE);
222 EXPECT_TRUE(grpc_slice_eq(input[0], output));
223 grpc_slice_unref_internal(output);
224 // Read all slices from stream2.
225 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
226 EXPECT_TRUE(stream2.Next(~(size_t)0, &closure));
227 error = stream2.Pull(&output);
228 EXPECT_TRUE(error == GRPC_ERROR_NONE);
229 EXPECT_TRUE(grpc_slice_eq(input[i], output));
230 grpc_slice_unref_internal(output);
232 // Now read the second slice from stream1.
233 EXPECT_TRUE(stream1.Next(~(size_t)0, &closure));
234 error = stream1.Pull(&output);
235 EXPECT_TRUE(error == GRPC_ERROR_NONE);
236 EXPECT_TRUE(grpc_slice_eq(input[1], output));
237 grpc_slice_unref_internal(output);
245 } // namespace grpc_core
247 int main(int argc, char** argv) {
248 grpc::testing::TestEnvironment env(argc, argv);
249 ::testing::InitGoogleTest(&argc, argv);
251 int retval = RUN_ALL_TESTS();