Imported Upstream version 1.41.0
[platform/upstream/grpc.git] / test / core / iomgr / ios / CFStreamTests / CFStreamEndpointTests.mm
1 /*
2  *
3  * Copyright 2018 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #import <XCTest/XCTest.h>
20
21 #include "src/core/lib/iomgr/port.h"
22
23 #ifdef GRPC_CFSTREAM
24
25 #include <netinet/in.h>
26
27 #include <grpc/impl/codegen/sync.h>
28 #include <grpc/support/sync.h>
29
30 #include "src/core/lib/iomgr/endpoint.h"
31 #include "src/core/lib/iomgr/resolve_address.h"
32 #include "src/core/lib/iomgr/tcp_client.h"
33 #include "test/core/util/resource_user_util.h"
34 #include "test/core/util/test_config.h"
35
36 static const int kConnectTimeout = 5;
37 static const int kWriteTimeout = 5;
38 static const int kReadTimeout = 5;
39
40 static const int kBufferSize = 10000;
41
42 static const int kRunLoopTimeout = 1;
43
44 static void set_atm(void *arg, grpc_error_handle error) {
45   gpr_atm *p = static_cast<gpr_atm *>(arg);
46   gpr_atm_full_cas(p, -1, reinterpret_cast<gpr_atm>(error));
47 }
48
49 static void init_event_closure(grpc_closure *closure, gpr_atm *atm) {
50   *atm = -1;
51   GRPC_CLOSURE_INIT(closure, set_atm, static_cast<void *>(atm), grpc_schedule_on_exec_ctx);
52 }
53
54 static bool compare_slice_buffer_with_buffer(grpc_slice_buffer *slices, const char *buffer,
55                                              size_t buffer_len) {
56   if (slices->length != buffer_len) {
57     return false;
58   }
59
60   for (int i = 0; i < slices->count; i++) {
61     grpc_slice slice = slices->slices[i];
62     if (0 != memcmp(buffer, GRPC_SLICE_START_PTR(slice), GRPC_SLICE_LENGTH(slice))) {
63       return false;
64     }
65     buffer += GRPC_SLICE_LENGTH(slice);
66   }
67
68   return true;
69 }
70
71 @interface CFStreamEndpointTests : XCTestCase
72
73 @end
74
75 @implementation CFStreamEndpointTests {
76   grpc_endpoint *ep_;
77   int svr_fd_;
78 }
79
80 - (BOOL)waitForEvent:(gpr_atm *)event timeout:(int)timeout {
81   grpc_core::ExecCtx::Get()->Flush();
82
83   NSDate *deadline = [NSDate dateWithTimeIntervalSinceNow:kConnectTimeout];
84   while (gpr_atm_acq_load(event) == -1 && [deadline timeIntervalSinceNow] > 0) {
85     NSDate *deadline = [NSDate dateWithTimeIntervalSinceNow:kRunLoopTimeout];
86     [[NSRunLoop mainRunLoop] runMode:NSDefaultRunLoopMode beforeDate:deadline];
87   }
88
89   return (gpr_atm_acq_load(event) != -1);
90 }
91
92 + (void)setUp {
93   grpc_init();
94 }
95
96 + (void)tearDown {
97   grpc_shutdown();
98 }
99
100 - (void)setUp {
101   self.continueAfterFailure = NO;
102
103   // Set up CFStream connection before testing the endpoint
104
105   grpc_core::ExecCtx exec_ctx;
106
107   grpc_resolved_address resolved_addr;
108   struct sockaddr_in *addr = reinterpret_cast<struct sockaddr_in *>(resolved_addr.addr);
109   int svr_fd;
110   int r;
111   gpr_atm connected = -1;
112   grpc_closure done;
113
114   gpr_log(GPR_DEBUG, "test_succeeds");
115
116   memset(&resolved_addr, 0, sizeof(resolved_addr));
117   resolved_addr.len = sizeof(struct sockaddr_in);
118   addr->sin_family = AF_INET;
119
120   /* create a phony server */
121   svr_fd = socket(AF_INET, SOCK_STREAM, 0);
122   XCTAssertGreaterThanOrEqual(svr_fd, 0);
123   XCTAssertEqual(bind(svr_fd, (struct sockaddr *)addr, (socklen_t)resolved_addr.len), 0);
124   XCTAssertEqual(listen(svr_fd, 1), 0);
125
126   /* connect to it */
127   XCTAssertEqual(getsockname(svr_fd, (struct sockaddr *)addr, (socklen_t *)&resolved_addr.len), 0);
128   init_event_closure(&done, &connected);
129   grpc_tcp_client_connect(&done, &ep_, grpc_slice_allocator_create_unlimited(), nullptr, nullptr,
130                           &resolved_addr, GRPC_MILLIS_INF_FUTURE);
131
132   /* await the connection */
133   do {
134     resolved_addr.len = sizeof(addr);
135     r = accept(svr_fd, reinterpret_cast<struct sockaddr *>(addr),
136                reinterpret_cast<socklen_t *>(&resolved_addr.len));
137   } while (r == -1 && errno == EINTR);
138   XCTAssertGreaterThanOrEqual(r, 0);
139   svr_fd_ = r;
140
141   /* wait for the connection callback to finish */
142   XCTAssertEqual([self waitForEvent:&connected timeout:kConnectTimeout], YES);
143   XCTAssertEqual(reinterpret_cast<grpc_error_handle>(connected), GRPC_ERROR_NONE);
144 }
145
146 - (void)tearDown {
147   grpc_core::ExecCtx exec_ctx;
148   close(svr_fd_);
149   grpc_endpoint_destroy(ep_);
150 }
151
152 - (void)testReadWrite {
153   grpc_core::ExecCtx exec_ctx;
154
155   gpr_atm read;
156   grpc_closure read_done;
157   grpc_slice_buffer read_slices;
158   grpc_slice_buffer read_one_slice;
159   gpr_atm write;
160   grpc_closure write_done;
161   grpc_slice_buffer write_slices;
162
163   grpc_slice slice;
164   char write_buffer[kBufferSize];
165   char read_buffer[kBufferSize];
166   size_t recv_size = 0;
167
168   grpc_slice_buffer_init(&write_slices);
169   slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
170   grpc_slice_buffer_add(&write_slices, slice);
171   init_event_closure(&write_done, &write);
172   grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr);
173
174   XCTAssertEqual([self waitForEvent:&write timeout:kWriteTimeout], YES);
175   XCTAssertEqual(reinterpret_cast<grpc_error_handle>(write), GRPC_ERROR_NONE);
176
177   while (recv_size < kBufferSize) {
178     ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
179     XCTAssertGreaterThanOrEqual(size, 0);
180     recv_size += size;
181   }
182
183   XCTAssertEqual(recv_size, kBufferSize);
184   XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
185   ssize_t send_size = send(svr_fd_, read_buffer, kBufferSize, 0);
186   XCTAssertGreaterThanOrEqual(send_size, 0);
187
188   grpc_slice_buffer_init(&read_slices);
189   grpc_slice_buffer_init(&read_one_slice);
190   while (read_slices.length < kBufferSize) {
191     init_event_closure(&read_done, &read);
192     grpc_endpoint_read(ep_, &read_one_slice, &read_done, /*urgent=*/false);
193     XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES);
194     XCTAssertEqual(reinterpret_cast<grpc_error_handle>(read), GRPC_ERROR_NONE);
195     grpc_slice_buffer_move_into(&read_one_slice, &read_slices);
196     XCTAssertLessThanOrEqual(read_slices.length, kBufferSize);
197   }
198   XCTAssertTrue(compare_slice_buffer_with_buffer(&read_slices, read_buffer, kBufferSize));
199
200   grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
201   grpc_slice_buffer_reset_and_unref(&read_slices);
202   grpc_slice_buffer_reset_and_unref(&write_slices);
203   grpc_slice_buffer_reset_and_unref(&read_one_slice);
204 }
205
206 - (void)testShutdownBeforeRead {
207   grpc_core::ExecCtx exec_ctx;
208
209   gpr_atm read;
210   grpc_closure read_done;
211   grpc_slice_buffer read_slices;
212   gpr_atm write;
213   grpc_closure write_done;
214   grpc_slice_buffer write_slices;
215
216   grpc_slice slice;
217   char write_buffer[kBufferSize];
218   char read_buffer[kBufferSize];
219   size_t recv_size = 0;
220
221   grpc_slice_buffer_init(&read_slices);
222   init_event_closure(&read_done, &read);
223   grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false);
224
225   grpc_slice_buffer_init(&write_slices);
226   slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
227   grpc_slice_buffer_add(&write_slices, slice);
228   init_event_closure(&write_done, &write);
229   grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr);
230
231   XCTAssertEqual([self waitForEvent:&write timeout:kWriteTimeout], YES);
232   XCTAssertEqual(reinterpret_cast<grpc_error_handle>(write), GRPC_ERROR_NONE);
233
234   while (recv_size < kBufferSize) {
235     ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
236     XCTAssertGreaterThanOrEqual(size, 0);
237     recv_size += size;
238   }
239
240   XCTAssertEqual(recv_size, kBufferSize);
241   XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
242
243   XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], NO);
244
245   grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
246
247   grpc_core::ExecCtx::Get()->Flush();
248   XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES);
249   XCTAssertNotEqual(reinterpret_cast<grpc_error_handle>(read), GRPC_ERROR_NONE);
250
251   grpc_slice_buffer_reset_and_unref(&read_slices);
252   grpc_slice_buffer_reset_and_unref(&write_slices);
253 }
254
255 - (void)testRemoteClosed {
256   grpc_core::ExecCtx exec_ctx;
257
258   gpr_atm read;
259   grpc_closure read_done;
260   grpc_slice_buffer read_slices;
261   gpr_atm write;
262   grpc_closure write_done;
263   grpc_slice_buffer write_slices;
264
265   grpc_slice slice;
266   char write_buffer[kBufferSize];
267   char read_buffer[kBufferSize];
268   size_t recv_size = 0;
269
270   init_event_closure(&read_done, &read);
271   grpc_slice_buffer_init(&read_slices);
272   grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false);
273
274   grpc_slice_buffer_init(&write_slices);
275   slice = grpc_slice_from_static_buffer(write_buffer, kBufferSize);
276   grpc_slice_buffer_add(&write_slices, slice);
277   init_event_closure(&write_done, &write);
278   grpc_endpoint_write(ep_, &write_slices, &write_done, nullptr);
279
280   XCTAssertEqual([self waitForEvent:&write timeout:kWriteTimeout], YES);
281   XCTAssertEqual(reinterpret_cast<grpc_error_handle>(write), GRPC_ERROR_NONE);
282
283   while (recv_size < kBufferSize) {
284     ssize_t size = recv(svr_fd_, read_buffer, kBufferSize, 0);
285     XCTAssertGreaterThanOrEqual(size, 0);
286     recv_size += size;
287   }
288
289   XCTAssertEqual(recv_size, kBufferSize);
290   XCTAssertEqual(memcmp(read_buffer, write_buffer, kBufferSize), 0);
291
292   close(svr_fd_);
293
294   XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES);
295   XCTAssertNotEqual(reinterpret_cast<grpc_error_handle>(read), GRPC_ERROR_NONE);
296
297   grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
298   grpc_slice_buffer_reset_and_unref(&read_slices);
299   grpc_slice_buffer_reset_and_unref(&write_slices);
300 }
301
302 - (void)testRemoteReset {
303   grpc_core::ExecCtx exec_ctx;
304
305   gpr_atm read;
306   grpc_closure read_done;
307   grpc_slice_buffer read_slices;
308
309   init_event_closure(&read_done, &read);
310   grpc_slice_buffer_init(&read_slices);
311   grpc_endpoint_read(ep_, &read_slices, &read_done, /*urgent=*/false);
312
313   struct linger so_linger;
314   so_linger.l_onoff = 1;
315   so_linger.l_linger = 0;
316   setsockopt(svr_fd_, SOL_SOCKET, SO_LINGER, &so_linger, sizeof(so_linger));
317
318   close(svr_fd_);
319
320   XCTAssertEqual([self waitForEvent:&read timeout:kReadTimeout], YES);
321   XCTAssertNotEqual(reinterpret_cast<grpc_error_handle>(read), GRPC_ERROR_NONE);
322
323   grpc_endpoint_shutdown(ep_, GRPC_ERROR_NONE);
324   grpc_slice_buffer_reset_and_unref(&read_slices);
325 }
326
327 @end
328
329 #else  // GRPC_CFSTREAM
330
331 // Phony test suite
332 @interface CFStreamEndpointTests : XCTestCase
333 @end
334
335 @implementation CFStreamEndpointTests
336 - (void)setUp {
337   [super setUp];
338 }
339
340 - (void)tearDown {
341   [super tearDown];
342 }
343
344 @end
345
346 #endif  // GRPC_CFSTREAM