3 * Copyright 2018 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #import <XCTest/XCTest.h>
21 #include "src/core/lib/iomgr/port.h"
25 #include <netinet/in.h>
27 #include <grpc/impl/codegen/sync.h>
28 #include <grpc/support/sync.h>
30 #include "src/core/lib/iomgr/endpoint.h"
31 #include "src/core/lib/iomgr/resolve_address.h"
32 #include "src/core/lib/iomgr/tcp_client.h"
33 #include "test/core/util/resource_user_util.h"
34 #include "test/core/util/test_config.h"
36 static const int kConnectTimeout = 5;
37 static const int kWriteTimeout = 5;
38 static const int kReadTimeout = 5;
40 static const int kBufferSize = 10000;
42 static const int kRunLoopTimeout = 1;
44 static void set_atm(void *arg, grpc_error_handle error) {
45 gpr_atm *p = static_cast<gpr_atm *>(arg);
46 gpr_atm_full_cas(p, -1, reinterpret_cast<gpr_atm>(error));
49 static void init_event_closure(grpc_closure *closure, gpr_atm *atm) {
51 GRPC_CLOSURE_INIT(closure, set_atm, static_cast<void *>(atm), grpc_schedule_on_exec_ctx);
54 static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const char *buffer,
56 if (slices->length != buffer_len) {
60 for (int i = 0; i < slices->count; i++) {
61 grpc_slice slice = slices->slices[i];
62 if (0 != memcmp(buffer, GRPC_SLICE_START_PTR(slice), GRPC_SLICE_LENGTH(slice))) {
65 buffer += GRPC_SLICE_LENGTH(slice);
71 @interface CFStreamEndpointTests : XCTestCase
75 @implementation CFStreamEndpointTests {
80 - (BOOL)waitForEvent:(gpr_atm *)event timeout:(int)timeout {
81 grpc_core::ExecCtx::Get()->Flush();
83 NSDate *deadline = [NSDate dateWithTimeIntervalSinceNow:kConnectTimeout];
84 while (gpr_atm_acq_load(event) == -1 && [deadline timeIntervalSinceNow] > 0) {
85 NSDate *deadline = [NSDate dateWithTimeIntervalSinceNow:kRunLoopTimeout];
86 [[NSRunLoop mainRunLoop] runMode:NSDefaultRunLoopMode beforeDate:deadline];
89 return (gpr_atm_acq_load(event) != -1);
101 self.continueAfterFailure = NO;
103 // Set up CFStream connection before testing the endpoint
105 grpc_core::ExecCtx exec_ctx;
107 grpc_resolved_address resolved_addr;
108 struct sockaddr_in *addr = reinterpret_cast<struct sockaddr_in *>(resolved_addr.addr);
111 gpr_atm connected = -1;
114 gpr_log(GPR_DEBUG, "test_succeeds");
116 memset(&resolved_addr, 0, sizeof(resolved_addr));
117 resolved_addr.len = sizeof(struct sockaddr_in);
118 addr->sin_family = AF_INET;
120 /* create a phony server */
121 svr_fd = socket(AF_INET, SOCK_STREAM, 0);
122 XCTAssertGreaterThanOrEqual(svr_fd, 0);
123 XCTAssertEqual(bind(svr_fd, (struct sockaddr *)addr, (socklen_t)resolved_addr.len), 0);
124 XCTAssertEqual(listen(svr_fd, 1), 0);
127 XCTAssertEqual(getsockname(svr_fd, (struct sockaddr *)addr, (socklen_t *)&resolved_addr.len), 0);
128 init_event_closure(&done, &connected);
129 grpc_tcp_client_connect(&done, &ep_, grpc_slice_allocator_create_unlimited(), nullptr, nullptr,
130 &resolved_addr, GRPC_MILLIS_INF_FUTURE);
132 /* await the connection */
134 resolved_addr.len = sizeof(addr);
135 r = accept(svr_fd, reinterpret_cast<struct sockaddr *>(addr),
136 reinterpret_cast<socklen_t *>(&resolved_addr.len));
137 } while (r == -1 && errno == EINTR);
138 XCTAssertGreaterThanOrEqual(r, 0);
141 /* wait for the connection callback to finish */
142 XCTAssertEqual([self waitForEvent:&connected timeout:kConnectTimeout], YES);
143 XCTAssertEqual(reinterpret_cast<grpc_error_handle>(connected), GRPC_ERROR_NONE);
147 grpc_core::ExecCtx exec_ctx;
149 grpc_endpoint_destroy(ep_);
152 - (void)testReadWrite {
153 grpc_core::ExecCtx exec_ctx;
156 grpc_closure read_done;
157 grpc_slice_buffer read_slices;
158 grpc_slice_buffer read_one_slice;
160 grpc_closure write_done;
161 grpc_slice_buffer write_slices;
164 char write_buffer[kBufferSize];
165 char read_buffer[kBufferSize];
166 size_t recv_size = 0;
168 grpc_slice_buffer_init(&write_slices);
169 slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
170 grpc_slice_buffer_add(&write_slices, slice);
171 init_event_closure(&write_done, &write);
172 grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr);
174 XCTAssertEqual([self waitForEvent:&write timeout:kWriteTimeout], YES);
175 XCTAssertEqual(reinterpret_cast<grpc_error_handle>(write), GRPC_ERROR_NONE);
177 while (recv_size < kBufferSize) {
178 ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
179 XCTAssertGreaterThanOrEqual(size, 0);
183 XCTAssertEqual(recv_size, kBufferSize);
184 XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
185 ssize_t send_size = send(svr_fd_, read_buffer, kBufferSize, 0);
186 XCTAssertGreaterThanOrEqual(send_size, 0);
188 grpc_slice_buffer_init(&read_slices);
189 grpc_slice_buffer_init(&read_one_slice);
190 while (read_slices.length < kBufferSize) {
191 init_event_closure(&read_done, &read);
192 grpc_endpoint_read(ep_, &read_one_slice, &read_done, /*urgent=*/false);
193 XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES);
194 XCTAssertEqual(reinterpret_cast<grpc_error_handle>(read), GRPC_ERROR_NONE);
195 grpc_slice_buffer_move_into(&read_one_slice, &read_slices);
196 XCTAssertLessThanOrEqual(read_slices.length, kBufferSize);
198 XCTAssertTrue(compare_slice_buffer_with_buffer(&read_slices, read_buffer, kBufferSize));
200 grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
201 grpc_slice_buffer_reset_and_unref(&read_slices);
202 grpc_slice_buffer_reset_and_unref(&write_slices);
203 grpc_slice_buffer_reset_and_unref(&read_one_slice);
206 - (void)testShutdownBeforeRead {
207 grpc_core::ExecCtx exec_ctx;
210 grpc_closure read_done;
211 grpc_slice_buffer read_slices;
213 grpc_closure write_done;
214 grpc_slice_buffer write_slices;
217 char write_buffer[kBufferSize];
218 char read_buffer[kBufferSize];
219 size_t recv_size = 0;
221 grpc_slice_buffer_init(&read_slices);
222 init_event_closure(&read_done, &read);
223 grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false);
225 grpc_slice_buffer_init(&write_slices);
226 slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
227 grpc_slice_buffer_add(&write_slices, slice);
228 init_event_closure(&write_done, &write);
229 grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr);
231 XCTAssertEqual([self waitForEvent:&write timeout:kWriteTimeout], YES);
232 XCTAssertEqual(reinterpret_cast<grpc_error_handle>(write), GRPC_ERROR_NONE);
234 while (recv_size < kBufferSize) {
235 ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
236 XCTAssertGreaterThanOrEqual(size, 0);
240 XCTAssertEqual(recv_size, kBufferSize);
241 XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
243 XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], NO);
245 grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
247 grpc_core::ExecCtx::Get()->Flush();
248 XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES);
249 XCTAssertNotEqual(reinterpret_cast<grpc_error_handle>(read), GRPC_ERROR_NONE);
251 grpc_slice_buffer_reset_and_unref(&read_slices);
252 grpc_slice_buffer_reset_and_unref(&write_slices);
255 - (void)testRemoteClosed {
256 grpc_core::ExecCtx exec_ctx;
259 grpc_closure read_done;
260 grpc_slice_buffer read_slices;
262 grpc_closure write_done;
263 grpc_slice_buffer write_slices;
266 char write_buffer[kBufferSize];
267 char read_buffer[kBufferSize];
268 size_t recv_size = 0;
270 init_event_closure(&read_done, &read);
271 grpc_slice_buffer_init(&read_slices);
272 grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false);
274 grpc_slice_buffer_init(&write_slices);
275 slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
276 grpc_slice_buffer_add(&write_slices, slice);
277 init_event_closure(&write_done, &write);
278 grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr);
280 XCTAssertEqual([self waitForEvent:&write timeout:kWriteTimeout], YES);
281 XCTAssertEqual(reinterpret_cast<grpc_error_handle>(write), GRPC_ERROR_NONE);
283 while (recv_size < kBufferSize) {
284 ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
285 XCTAssertGreaterThanOrEqual(size, 0);
289 XCTAssertEqual(recv_size, kBufferSize);
290 XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
294 XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES);
295 XCTAssertNotEqual(reinterpret_cast<grpc_error_handle>(read), GRPC_ERROR_NONE);
297 grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
298 grpc_slice_buffer_reset_and_unref(&read_slices);
299 grpc_slice_buffer_reset_and_unref(&write_slices);
302 - (void)testRemoteReset {
303 grpc_core::ExecCtx exec_ctx;
306 grpc_closure read_done;
307 grpc_slice_buffer read_slices;
309 init_event_closure(&read_done, &read);
310 grpc_slice_buffer_init(&read_slices);
311 grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false);
313 struct linger so_linger;
314 so_linger.l_onoff = 1;
315 so_linger.l_linger = 0;
316 setsockopt(svr_fd_, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger));
320 XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES);
321 XCTAssertNotEqual(reinterpret_cast<grpc_error_handle>(read), GRPC_ERROR_NONE);
323 grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
324 grpc_slice_buffer_reset_and_unref(&read_slices);
329 #else // GRPC_CFSTREAM
332 @interface CFStreamEndpointTests : XCTestCase
335 @implementation CFStreamEndpointTests
346 #endif // GRPC_CFSTREAM