3 * Copyright 2017 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include "src/core/lib/transport/byte_stream.h"
21 #include <grpc/grpc.h>
22 #include <grpc/support/alloc.h>
23 #include <grpc/support/log.h>
25 #include "src/core/lib/gpr/useful.h"
26 #include "src/core/lib/iomgr/exec_ctx.h"
27 #include "src/core/lib/slice/slice_internal.h"
29 #include "test/core/util/test_config.h"
31 #include <gtest/gtest.h>
37 // SliceBufferByteStream tests
40 void NotCalledClosure(void* /*arg*/, grpc_error_handle /*error*/) {
44 TEST(SliceBufferByteStream, Basic) {
45 grpc_core::ExecCtx exec_ctx;
46 // Create and populate slice buffer.
47 grpc_slice_buffer buffer;
48 grpc_slice_buffer_init(&buffer);
49 grpc_slice input[] = {
50 grpc_slice_from_static_string("foo"),
51 grpc_slice_from_static_string("bar"),
53 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
54 grpc_slice_buffer_add(&buffer, input[i]);
56 // Create byte stream.
57 SliceBufferByteStream stream(&buffer, 0);
58 grpc_slice_buffer_destroy_internal(&buffer);
59 EXPECT_EQ(6U, stream.length());
61 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
62 grpc_schedule_on_exec_ctx);
63 // Read each slice. Note that Next() always returns synchronously.
64 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
65 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
67 grpc_error_handle error = stream.Pull(&output);
68 EXPECT_TRUE(error == GRPC_ERROR_NONE);
69 EXPECT_TRUE(grpc_slice_eq(input[i], output));
70 grpc_slice_unref_internal(output);
76 TEST(SliceBufferByteStream, Shutdown) {
77 grpc_core::ExecCtx exec_ctx;
78 // Create and populate slice buffer.
79 grpc_slice_buffer buffer;
80 grpc_slice_buffer_init(&buffer);
81 grpc_slice input[] = {
82 grpc_slice_from_static_string("foo"),
83 grpc_slice_from_static_string("bar"),
85 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
86 grpc_slice_buffer_add(&buffer, input[i]);
88 // Create byte stream.
89 SliceBufferByteStream stream(&buffer, 0);
90 grpc_slice_buffer_destroy_internal(&buffer);
91 EXPECT_EQ(6U, stream.length());
93 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
94 grpc_schedule_on_exec_ctx);
95 // Read the first slice.
96 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
98 grpc_error_handle error = stream.Pull(&output);
99 EXPECT_TRUE(error == GRPC_ERROR_NONE);
100 EXPECT_TRUE(grpc_slice_eq(input[0], output));
101 grpc_slice_unref_internal(output);
103 grpc_error_handle shutdown_error =
104 GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown error");
105 stream.Shutdown(GRPC_ERROR_REF(shutdown_error));
106 // After shutdown, the next pull() should return the error.
107 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
108 error = stream.Pull(&output);
109 EXPECT_TRUE(error == shutdown_error);
110 GRPC_ERROR_UNREF(error);
111 GRPC_ERROR_UNREF(shutdown_error);
117 // CachingByteStream tests
120 TEST(CachingByteStream, Basic) {
121 grpc_core::ExecCtx exec_ctx;
122 // Create and populate slice buffer byte stream.
123 grpc_slice_buffer buffer;
124 grpc_slice_buffer_init(&buffer);
125 grpc_slice input[] = {
126 grpc_slice_from_static_string("foo"),
127 grpc_slice_from_static_string("bar"),
129 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
130 grpc_slice_buffer_add(&buffer, input[i]);
132 SliceBufferByteStream underlying_stream(&buffer, 0);
133 grpc_slice_buffer_destroy_internal(&buffer);
134 // Create cache and caching stream.
135 ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
136 ByteStreamCache::CachingByteStream stream(&cache);
137 grpc_closure closure;
138 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
139 grpc_schedule_on_exec_ctx);
140 // Read each slice. Note that next() always returns synchronously,
141 // because the underlying byte stream always does.
142 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
143 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
145 grpc_error_handle error = stream.Pull(&output);
146 EXPECT_TRUE(error == GRPC_ERROR_NONE);
147 EXPECT_TRUE(grpc_slice_eq(input[i], output));
148 grpc_slice_unref_internal(output);
155 TEST(CachingByteStream, Reset) {
156 grpc_core::ExecCtx exec_ctx;
157 // Create and populate slice buffer byte stream.
158 grpc_slice_buffer buffer;
159 grpc_slice_buffer_init(&buffer);
160 grpc_slice input[] = {
161 grpc_slice_from_static_string("foo"),
162 grpc_slice_from_static_string("bar"),
164 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
165 grpc_slice_buffer_add(&buffer, input[i]);
167 SliceBufferByteStream underlying_stream(&buffer, 0);
168 grpc_slice_buffer_destroy_internal(&buffer);
169 // Create cache and caching stream.
170 ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
171 ByteStreamCache::CachingByteStream stream(&cache);
172 grpc_closure closure;
173 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
174 grpc_schedule_on_exec_ctx);
176 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
178 grpc_error_handle error = stream.Pull(&output);
179 EXPECT_TRUE(error == GRPC_ERROR_NONE);
180 EXPECT_TRUE(grpc_slice_eq(input[0], output));
181 grpc_slice_unref_internal(output);
182 // Reset the caching stream. The reads should start over from the
185 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
186 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
187 error = stream.Pull(&output);
188 EXPECT_TRUE(error == GRPC_ERROR_NONE);
189 EXPECT_TRUE(grpc_slice_eq(input[i], output));
190 grpc_slice_unref_internal(output);
197 TEST(CachingByteStream, SharedCache) {
198 grpc_core::ExecCtx exec_ctx;
199 // Create and populate slice buffer byte stream.
200 grpc_slice_buffer buffer;
201 grpc_slice_buffer_init(&buffer);
202 grpc_slice input[] = {
203 grpc_slice_from_static_string("foo"),
204 grpc_slice_from_static_string("bar"),
206 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
207 grpc_slice_buffer_add(&buffer, input[i]);
209 SliceBufferByteStream underlying_stream(&buffer, 0);
210 grpc_slice_buffer_destroy_internal(&buffer);
211 // Create cache and two caching streams.
212 ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
213 ByteStreamCache::CachingByteStream stream1(&cache);
214 ByteStreamCache::CachingByteStream stream2(&cache);
215 grpc_closure closure;
216 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
217 grpc_schedule_on_exec_ctx);
218 // Read one slice from stream1.
219 EXPECT_TRUE(stream1.Next(~(size_t)0, &closure));
221 grpc_error_handle error = stream1.Pull(&output);
222 EXPECT_TRUE(error == GRPC_ERROR_NONE);
223 EXPECT_TRUE(grpc_slice_eq(input[0], output));
224 grpc_slice_unref_internal(output);
225 // Read all slices from stream2.
226 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
227 EXPECT_TRUE(stream2.Next(~(size_t)0, &closure));
228 error = stream2.Pull(&output);
229 EXPECT_TRUE(error == GRPC_ERROR_NONE);
230 EXPECT_TRUE(grpc_slice_eq(input[i], output));
231 grpc_slice_unref_internal(output);
233 // Now read the second slice from stream1.
234 EXPECT_TRUE(stream1.Next(~(size_t)0, &closure));
235 error = stream1.Pull(&output);
236 EXPECT_TRUE(error == GRPC_ERROR_NONE);
237 EXPECT_TRUE(grpc_slice_eq(input[1], output));
238 grpc_slice_unref_internal(output);
246 } // namespace grpc_core
248 int main(int argc, char** argv) {
249 grpc::testing::TestEnvironment env(argc, argv);
250 ::testing::InitGoogleTest(&argc, argv);
252 int retval = RUN_ALL_TESTS();