3 * Copyright 2017 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #include "src/core/lib/transport/byte_stream.h"
21 #include <grpc/grpc.h>
22 #include <grpc/support/alloc.h>
23 #include <grpc/support/log.h>
25 #include "src/core/lib/gpr/useful.h"
26 #include "src/core/lib/iomgr/exec_ctx.h"
27 #include "src/core/lib/slice/slice_internal.h"
29 #include "test/core/util/test_config.h"
31 #include <gtest/gtest.h>
37 // SliceBufferByteStream tests
40 void NotCalledClosure(void* arg, grpc_error* error) { GPR_ASSERT(false); }
42 TEST(SliceBufferByteStream, Basic) {
43 grpc_core::ExecCtx exec_ctx;
44 // Create and populate slice buffer.
45 grpc_slice_buffer buffer;
46 grpc_slice_buffer_init(&buffer);
47 grpc_slice input[] = {
48 grpc_slice_from_static_string("foo"),
49 grpc_slice_from_static_string("bar"),
51 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
52 grpc_slice_buffer_add(&buffer, input[i]);
54 // Create byte stream.
55 SliceBufferByteStream stream(&buffer, 0);
56 grpc_slice_buffer_destroy_internal(&buffer);
57 EXPECT_EQ(6U, stream.length());
59 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
60 grpc_schedule_on_exec_ctx);
61 // Read each slice. Note that Next() always returns synchronously.
62 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
63 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
65 grpc_error* error = stream.Pull(&output);
66 EXPECT_TRUE(error == GRPC_ERROR_NONE);
67 EXPECT_TRUE(grpc_slice_eq(input[i], output));
68 grpc_slice_unref_internal(output);
74 TEST(SliceBufferByteStream, Shutdown) {
75 grpc_core::ExecCtx exec_ctx;
76 // Create and populate slice buffer.
77 grpc_slice_buffer buffer;
78 grpc_slice_buffer_init(&buffer);
79 grpc_slice input[] = {
80 grpc_slice_from_static_string("foo"),
81 grpc_slice_from_static_string("bar"),
83 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
84 grpc_slice_buffer_add(&buffer, input[i]);
86 // Create byte stream.
87 SliceBufferByteStream stream(&buffer, 0);
88 grpc_slice_buffer_destroy_internal(&buffer);
89 EXPECT_EQ(6U, stream.length());
91 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
92 grpc_schedule_on_exec_ctx);
93 // Read the first slice.
94 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
96 grpc_error* error = stream.Pull(&output);
97 EXPECT_TRUE(error == GRPC_ERROR_NONE);
98 EXPECT_TRUE(grpc_slice_eq(input[0], output));
99 grpc_slice_unref_internal(output);
101 grpc_error* shutdown_error =
102 GRPC_ERROR_CREATE_FROM_STATIC_STRING("shutdown error");
103 stream.Shutdown(GRPC_ERROR_REF(shutdown_error));
104 // After shutdown, the next pull() should return the error.
105 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
106 error = stream.Pull(&output);
107 EXPECT_TRUE(error == shutdown_error);
108 GRPC_ERROR_UNREF(error);
109 GRPC_ERROR_UNREF(shutdown_error);
115 // CachingByteStream tests
118 TEST(CachingByteStream, Basic) {
119 grpc_core::ExecCtx exec_ctx;
120 // Create and populate slice buffer byte stream.
121 grpc_slice_buffer buffer;
122 grpc_slice_buffer_init(&buffer);
123 grpc_slice input[] = {
124 grpc_slice_from_static_string("foo"),
125 grpc_slice_from_static_string("bar"),
127 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
128 grpc_slice_buffer_add(&buffer, input[i]);
130 SliceBufferByteStream underlying_stream(&buffer, 0);
131 grpc_slice_buffer_destroy_internal(&buffer);
132 // Create cache and caching stream.
133 ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
134 ByteStreamCache::CachingByteStream stream(&cache);
135 grpc_closure closure;
136 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
137 grpc_schedule_on_exec_ctx);
138 // Read each slice. Note that next() always returns synchronously,
139 // because the underlying byte stream always does.
140 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
141 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
143 grpc_error* error = stream.Pull(&output);
144 EXPECT_TRUE(error == GRPC_ERROR_NONE);
145 EXPECT_TRUE(grpc_slice_eq(input[i], output));
146 grpc_slice_unref_internal(output);
153 TEST(CachingByteStream, Reset) {
154 grpc_core::ExecCtx exec_ctx;
155 // Create and populate slice buffer byte stream.
156 grpc_slice_buffer buffer;
157 grpc_slice_buffer_init(&buffer);
158 grpc_slice input[] = {
159 grpc_slice_from_static_string("foo"),
160 grpc_slice_from_static_string("bar"),
162 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
163 grpc_slice_buffer_add(&buffer, input[i]);
165 SliceBufferByteStream underlying_stream(&buffer, 0);
166 grpc_slice_buffer_destroy_internal(&buffer);
167 // Create cache and caching stream.
168 ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
169 ByteStreamCache::CachingByteStream stream(&cache);
170 grpc_closure closure;
171 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
172 grpc_schedule_on_exec_ctx);
174 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
176 grpc_error* error = stream.Pull(&output);
177 EXPECT_TRUE(error == GRPC_ERROR_NONE);
178 EXPECT_TRUE(grpc_slice_eq(input[0], output));
179 grpc_slice_unref_internal(output);
180 // Reset the caching stream. The reads should start over from the
183 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
184 ASSERT_TRUE(stream.Next(~(size_t)0, &closure));
185 error = stream.Pull(&output);
186 EXPECT_TRUE(error == GRPC_ERROR_NONE);
187 EXPECT_TRUE(grpc_slice_eq(input[i], output));
188 grpc_slice_unref_internal(output);
195 TEST(CachingByteStream, SharedCache) {
196 grpc_core::ExecCtx exec_ctx;
197 // Create and populate slice buffer byte stream.
198 grpc_slice_buffer buffer;
199 grpc_slice_buffer_init(&buffer);
200 grpc_slice input[] = {
201 grpc_slice_from_static_string("foo"),
202 grpc_slice_from_static_string("bar"),
204 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
205 grpc_slice_buffer_add(&buffer, input[i]);
207 SliceBufferByteStream underlying_stream(&buffer, 0);
208 grpc_slice_buffer_destroy_internal(&buffer);
209 // Create cache and two caching streams.
210 ByteStreamCache cache((OrphanablePtr<ByteStream>(&underlying_stream)));
211 ByteStreamCache::CachingByteStream stream1(&cache);
212 ByteStreamCache::CachingByteStream stream2(&cache);
213 grpc_closure closure;
214 GRPC_CLOSURE_INIT(&closure, NotCalledClosure, nullptr,
215 grpc_schedule_on_exec_ctx);
216 // Read one slice from stream1.
217 EXPECT_TRUE(stream1.Next(~(size_t)0, &closure));
219 grpc_error* error = stream1.Pull(&output);
220 EXPECT_TRUE(error == GRPC_ERROR_NONE);
221 EXPECT_TRUE(grpc_slice_eq(input[0], output));
222 grpc_slice_unref_internal(output);
223 // Read all slices from stream2.
224 for (size_t i = 0; i < GPR_ARRAY_SIZE(input); ++i) {
225 EXPECT_TRUE(stream2.Next(~(size_t)0, &closure));
226 error = stream2.Pull(&output);
227 EXPECT_TRUE(error == GRPC_ERROR_NONE);
228 EXPECT_TRUE(grpc_slice_eq(input[i], output));
229 grpc_slice_unref_internal(output);
231 // Now read the second slice from stream1.
232 EXPECT_TRUE(stream1.Next(~(size_t)0, &closure));
233 error = stream1.Pull(&output);
234 EXPECT_TRUE(error == GRPC_ERROR_NONE);
235 EXPECT_TRUE(grpc_slice_eq(input[1], output));
236 grpc_slice_unref_internal(output);
244 } // namespace grpc_core
246 int main(int argc, char** argv) {
248 grpc::testing::TestEnvironment env(argc, argv);
249 ::testing::InitGoogleTest(&argc, argv);
250 int retval = RUN_ALL_TESTS();