bae94ef239419f9201a17f42028793406aa9744b
[platform/upstream/grpc.git] / src / objective-c / tests / InteropTests / InteropTests.m
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #import "InteropTests.h"
20
21 #include <grpc/status.h>
22
23 #import <GRPCClient/GRPCCall+ChannelArg.h>
24 #import <GRPCClient/GRPCCall+Cronet.h>
25 #import <GRPCClient/GRPCCall+Interceptor.h>
26 #import <GRPCClient/GRPCCall+Tests.h>
27 #import <GRPCClient/GRPCInterceptor.h>
28 #import <GRPCClient/internal_testing/GRPCCall+InternalTests.h>
29 #import <ProtoRPC/ProtoRPC.h>
30 #import <RxLibrary/GRXBufferedPipe.h>
31 #import <RxLibrary/GRXWriter+Immediate.h>
32 #import <grpc/grpc.h>
33 #import <grpc/support/log.h>
34 #import "src/objective-c/tests/RemoteTestClient/Messages.pbobjc.h"
35 #import "src/objective-c/tests/RemoteTestClient/Test.pbobjc.h"
36 #import "src/objective-c/tests/RemoteTestClient/Test.pbrpc.h"
37
38 #import "InteropTestsBlockCallbacks.h"
39
40 #define TEST_TIMEOUT 32
41 #define STREAMING_CALL_TEST_TIMEOUT 64
42
43 static const int kTestRetries = 3;
44 extern const char *kCFStreamVarName;
45
46 // Convenience constructors for the generated proto messages:
47
48 @interface RMTStreamingOutputCallRequest (Constructors)
49 + (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize
50                  requestedResponseSize:(NSNumber *)responseSize;
51 @end
52
53 @implementation RMTStreamingOutputCallRequest (Constructors)
54 + (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize
55                  requestedResponseSize:(NSNumber *)responseSize {
56   RMTStreamingOutputCallRequest *request = [self message];
57   RMTResponseParameters *parameters = [RMTResponseParameters message];
58   parameters.size = responseSize.intValue;
59   [request.responseParametersArray addObject:parameters];
60   request.payload.body = [NSMutableData dataWithLength:payloadSize.unsignedIntegerValue];
61   return request;
62 }
63 @end
64
65 @interface RMTStreamingOutputCallResponse (Constructors)
66 + (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize;
67 @end
68
69 @implementation RMTStreamingOutputCallResponse (Constructors)
70 + (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize {
71   RMTStreamingOutputCallResponse *response = [self message];
72   response.payload.type = RMTPayloadType_Compressable;
73   response.payload.body = [NSMutableData dataWithLength:payloadSize.unsignedIntegerValue];
74   return response;
75 }
76 @end
77
78 BOOL isRemoteInteropTest(NSString *host) {
79   return [host isEqualToString:@"grpc-test.sandbox.googleapis.com"];
80 }
81
82 @interface DefaultInterceptorFactory : NSObject <GRPCInterceptorFactory>
83
84 - (GRPCInterceptor *)createInterceptorWithManager:(GRPCInterceptorManager *)interceptorManager;
85
86 @end
87
88 @implementation DefaultInterceptorFactory
89
90 - (GRPCInterceptor *)createInterceptorWithManager:(GRPCInterceptorManager *)interceptorManager {
91   dispatch_queue_t queue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
92   return [[GRPCInterceptor alloc] initWithInterceptorManager:interceptorManager
93                                                dispatchQueue:queue];
94 }
95
96 @end
97
98 @interface HookInterceptorFactory : NSObject <GRPCInterceptorFactory>
99
100 - (instancetype)
101       initWithDispatchQueue:(dispatch_queue_t)dispatchQueue
102                   startHook:(void (^)(GRPCRequestOptions *requestOptions,
103                                       GRPCCallOptions *callOptions,
104                                       GRPCInterceptorManager *manager))startHook
105               writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook
106                  finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook
107     receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages,
108                                       GRPCInterceptorManager *manager))receiveNextMessagesHook
109          responseHeaderHook:(void (^)(NSDictionary *initialMetadata,
110                                       GRPCInterceptorManager *manager))responseHeaderHook
111            responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook
112           responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error,
113                                       GRPCInterceptorManager *manager))responseCloseHook
114            didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook;
115
116 - (GRPCInterceptor *)createInterceptorWithManager:(GRPCInterceptorManager *)interceptorManager;
117
118 @end
119
120 @interface HookInterceptor : GRPCInterceptor
121
122 - (instancetype)
123     initWithInterceptorManager:(GRPCInterceptorManager *)interceptorManager
124                  dispatchQueue:(dispatch_queue_t)dispatchQueue
125                      startHook:(void (^)(GRPCRequestOptions *requestOptions,
126                                          GRPCCallOptions *callOptions,
127                                          GRPCInterceptorManager *manager))startHook
128                  writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook
129                     finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook
130        receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages,
131                                          GRPCInterceptorManager *manager))receiveNextMessagesHook
132             responseHeaderHook:(void (^)(NSDictionary *initialMetadata,
133                                          GRPCInterceptorManager *manager))responseHeaderHook
134               responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook
135              responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error,
136                                          GRPCInterceptorManager *manager))responseCloseHook
137               didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook;
138
139 @end
140
141 @implementation HookInterceptorFactory {
142  @protected
143   void (^_startHook)(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
144                      GRPCInterceptorManager *manager);
145   void (^_writeDataHook)(id data, GRPCInterceptorManager *manager);
146   void (^_finishHook)(GRPCInterceptorManager *manager);
147   void (^_receiveNextMessagesHook)(NSUInteger numberOfMessages, GRPCInterceptorManager *manager);
148   void (^_responseHeaderHook)(NSDictionary *initialMetadata, GRPCInterceptorManager *manager);
149   void (^_responseDataHook)(id data, GRPCInterceptorManager *manager);
150   void (^_responseCloseHook)(NSDictionary *trailingMetadata, NSError *error,
151                              GRPCInterceptorManager *manager);
152   void (^_didWriteDataHook)(GRPCInterceptorManager *manager);
153   dispatch_queue_t _dispatchQueue;
154 }
155
156 - (instancetype)
157       initWithDispatchQueue:(dispatch_queue_t)dispatchQueue
158                   startHook:(void (^)(GRPCRequestOptions *requestOptions,
159                                       GRPCCallOptions *callOptions,
160                                       GRPCInterceptorManager *manager))startHook
161               writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook
162                  finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook
163     receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages,
164                                       GRPCInterceptorManager *manager))receiveNextMessagesHook
165          responseHeaderHook:(void (^)(NSDictionary *initialMetadata,
166                                       GRPCInterceptorManager *manager))responseHeaderHook
167            responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook
168           responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error,
169                                       GRPCInterceptorManager *manager))responseCloseHook
170            didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook {
171   if ((self = [super init])) {
172     _dispatchQueue = dispatchQueue;
173     _startHook = startHook;
174     _writeDataHook = writeDataHook;
175     _finishHook = finishHook;
176     _receiveNextMessagesHook = receiveNextMessagesHook;
177     _responseHeaderHook = responseHeaderHook;
178     _responseDataHook = responseDataHook;
179     _responseCloseHook = responseCloseHook;
180     _didWriteDataHook = didWriteDataHook;
181   }
182   return self;
183 }
184
185 - (GRPCInterceptor *)createInterceptorWithManager:(GRPCInterceptorManager *)interceptorManager {
186   return [[HookInterceptor alloc] initWithInterceptorManager:interceptorManager
187                                                dispatchQueue:_dispatchQueue
188                                                    startHook:_startHook
189                                                writeDataHook:_writeDataHook
190                                                   finishHook:_finishHook
191                                      receiveNextMessagesHook:_receiveNextMessagesHook
192                                           responseHeaderHook:_responseHeaderHook
193                                             responseDataHook:_responseDataHook
194                                            responseCloseHook:_responseCloseHook
195                                             didWriteDataHook:_didWriteDataHook];
196 }
197
198 @end
199
200 @implementation HookInterceptor {
201   void (^_startHook)(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
202                      GRPCInterceptorManager *manager);
203   void (^_writeDataHook)(id data, GRPCInterceptorManager *manager);
204   void (^_finishHook)(GRPCInterceptorManager *manager);
205   void (^_receiveNextMessagesHook)(NSUInteger numberOfMessages, GRPCInterceptorManager *manager);
206   void (^_responseHeaderHook)(NSDictionary *initialMetadata, GRPCInterceptorManager *manager);
207   void (^_responseDataHook)(id data, GRPCInterceptorManager *manager);
208   void (^_responseCloseHook)(NSDictionary *trailingMetadata, NSError *error,
209                              GRPCInterceptorManager *manager);
210   void (^_didWriteDataHook)(GRPCInterceptorManager *manager);
211   GRPCInterceptorManager *_manager;
212   dispatch_queue_t _dispatchQueue;
213 }
214
215 - (dispatch_queue_t)dispatchQueue {
216   return _dispatchQueue;
217 }
218
219 - (instancetype)
220     initWithInterceptorManager:(GRPCInterceptorManager *)interceptorManager
221                  dispatchQueue:(dispatch_queue_t)dispatchQueue
222                      startHook:(void (^)(GRPCRequestOptions *requestOptions,
223                                          GRPCCallOptions *callOptions,
224                                          GRPCInterceptorManager *manager))startHook
225                  writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook
226                     finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook
227        receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages,
228                                          GRPCInterceptorManager *manager))receiveNextMessagesHook
229             responseHeaderHook:(void (^)(NSDictionary *initialMetadata,
230                                          GRPCInterceptorManager *manager))responseHeaderHook
231               responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook
232              responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error,
233                                          GRPCInterceptorManager *manager))responseCloseHook
234               didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook {
235   if ((self = [super initWithInterceptorManager:interceptorManager dispatchQueue:dispatchQueue])) {
236     _startHook = startHook;
237     _writeDataHook = writeDataHook;
238     _finishHook = finishHook;
239     _receiveNextMessagesHook = receiveNextMessagesHook;
240     _responseHeaderHook = responseHeaderHook;
241     _responseDataHook = responseDataHook;
242     _responseCloseHook = responseCloseHook;
243     _didWriteDataHook = didWriteDataHook;
244     _dispatchQueue = dispatchQueue;
245     _manager = interceptorManager;
246   }
247   return self;
248 }
249
250 - (void)startWithRequestOptions:(GRPCRequestOptions *)requestOptions
251                     callOptions:(GRPCCallOptions *)callOptions {
252   if (_startHook) {
253     _startHook(requestOptions, callOptions, _manager);
254   }
255 }
256
257 - (void)writeData:(id)data {
258   if (_writeDataHook) {
259     _writeDataHook(data, _manager);
260   }
261 }
262
263 - (void)finish {
264   if (_finishHook) {
265     _finishHook(_manager);
266   }
267 }
268
269 - (void)receiveNextMessages:(NSUInteger)numberOfMessages {
270   if (_receiveNextMessagesHook) {
271     _receiveNextMessagesHook(numberOfMessages, _manager);
272   }
273 }
274
275 - (void)didReceiveInitialMetadata:(NSDictionary *)initialMetadata {
276   if (_responseHeaderHook) {
277     _responseHeaderHook(initialMetadata, _manager);
278   }
279 }
280
281 - (void)didReceiveData:(id)data {
282   if (_responseDataHook) {
283     _responseDataHook(data, _manager);
284   }
285 }
286
287 - (void)didCloseWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error {
288   if (_responseCloseHook) {
289     _responseCloseHook(trailingMetadata, error, _manager);
290   }
291 }
292
293 - (void)didWriteData {
294   if (_didWriteDataHook) {
295     _didWriteDataHook(_manager);
296   }
297 }
298
299 @end
300
301 @interface GlobalInterceptorFactory : HookInterceptorFactory
302
303 @property BOOL enabled;
304
305 - (instancetype)initWithDispatchQueue:(dispatch_queue_t)dispatchQueue;
306
307 - (void)setStartHook:(void (^)(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
308                                GRPCInterceptorManager *manager))startHook
309               writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook
310                  finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook
311     receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages,
312                                       GRPCInterceptorManager *manager))receiveNextMessagesHook
313          responseHeaderHook:(void (^)(NSDictionary *initialMetadata,
314                                       GRPCInterceptorManager *manager))responseHeaderHook
315            responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook
316           responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error,
317                                       GRPCInterceptorManager *manager))responseCloseHook
318            didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook;
319
320 @end
321
322 @implementation GlobalInterceptorFactory
323
324 - (instancetype)initWithDispatchQueue:(dispatch_queue_t)dispatchQueue {
325   _enabled = NO;
326   return [super initWithDispatchQueue:dispatchQueue
327                             startHook:nil
328                         writeDataHook:nil
329                            finishHook:nil
330               receiveNextMessagesHook:nil
331                    responseHeaderHook:nil
332                      responseDataHook:nil
333                     responseCloseHook:nil
334                      didWriteDataHook:nil];
335 }
336
337 - (GRPCInterceptor *)createInterceptorWithManager:(GRPCInterceptorManager *)interceptorManager {
338   if (_enabled) {
339     return [[HookInterceptor alloc] initWithInterceptorManager:interceptorManager
340                                                  dispatchQueue:_dispatchQueue
341                                                      startHook:_startHook
342                                                  writeDataHook:_writeDataHook
343                                                     finishHook:_finishHook
344                                        receiveNextMessagesHook:_receiveNextMessagesHook
345                                             responseHeaderHook:_responseHeaderHook
346                                               responseDataHook:_responseDataHook
347                                              responseCloseHook:_responseCloseHook
348                                               didWriteDataHook:_didWriteDataHook];
349   } else {
350     return nil;
351   }
352 }
353
354 - (void)setStartHook:(void (^)(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
355                                GRPCInterceptorManager *manager))startHook
356               writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook
357                  finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook
358     receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages,
359                                       GRPCInterceptorManager *manager))receiveNextMessagesHook
360          responseHeaderHook:(void (^)(NSDictionary *initialMetadata,
361                                       GRPCInterceptorManager *manager))responseHeaderHook
362            responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook
363           responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error,
364                                       GRPCInterceptorManager *manager))responseCloseHook
365            didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook {
366   _startHook = startHook;
367   _writeDataHook = writeDataHook;
368   _finishHook = finishHook;
369   _receiveNextMessagesHook = receiveNextMessagesHook;
370   _responseHeaderHook = responseHeaderHook;
371   _responseDataHook = responseDataHook;
372   _responseCloseHook = responseCloseHook;
373   _didWriteDataHook = didWriteDataHook;
374 }
375
376 @end
377
378 static GlobalInterceptorFactory *globalInterceptorFactory = nil;
379 static dispatch_once_t initGlobalInterceptorFactory;
380
381 #pragma mark Tests
382
383 @implementation InteropTests {
384   RMTTestService *_service;
385 }
386
387 #pragma clang diagnostic push
388 #pragma clang diagnostic ignored "-Warc-performSelector-leaks"
389 - (void)retriableTest:(SEL)selector retries:(int)retries timeout:(NSTimeInterval)timeout {
390   for (int i = 0; i < retries; i++) {
391     NSDate *waitUntil = [[NSDate date] dateByAddingTimeInterval:timeout];
392     NSCondition *cv = [[NSCondition alloc] init];
393     __block BOOL done = NO;
394     [cv lock];
395     dispatch_async(dispatch_get_global_queue(QOS_CLASS_BACKGROUND, 0), ^{
396       [self performSelector:selector];
397       [cv lock];
398       done = YES;
399       [cv signal];
400       [cv unlock];
401     });
402     while (!done && [waitUntil timeIntervalSinceNow] > 0) {
403       [cv waitUntilDate:waitUntil];
404     }
405     if (done) {
406       [cv unlock];
407       break;
408     } else {
409       [cv unlock];
410       [self tearDown];
411       [self setUp];
412     }
413   }
414 }
415 #pragma clang diagnostic pop
416
417 + (XCTestSuite *)defaultTestSuite {
418   if (self == [InteropTests class]) {
419     return [XCTestSuite testSuiteWithName:@"InteropTestsEmptySuite"];
420   } else {
421     return super.defaultTestSuite;
422   }
423 }
424
425 + (NSString *)host {
426   return nil;
427 }
428
429 // This number indicates how many bytes of overhead does Protocol Buffers encoding add onto the
430 // message. The number varies as different message.proto is used on different servers. The actual
431 // number for each interop server is overridden in corresponding derived test classes.
432 - (int32_t)encodingOverhead {
433   return 0;
434 }
435
436 // For backwards compatibility
437 + (GRPCTransportType)transportType {
438   return GRPCTransportTypeChttp2BoringSSL;
439 }
440
441 + (GRPCTransportID)transport {
442   return NULL;
443 }
444
445 + (NSString *)PEMRootCertificates {
446   return nil;
447 }
448
449 + (NSString *)hostNameOverride {
450   return nil;
451 }
452
453 + (void)setUp {
454   dispatch_once(&initGlobalInterceptorFactory, ^{
455     dispatch_queue_t globalInterceptorQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
456     globalInterceptorFactory =
457         [[GlobalInterceptorFactory alloc] initWithDispatchQueue:globalInterceptorQueue];
458     [GRPCCall2 registerGlobalInterceptor:globalInterceptorFactory];
459   });
460 }
461
462 - (void)setUp {
463   self.continueAfterFailure = NO;
464
465   [GRPCCall resetHostSettings];
466
467 #pragma clang diagnostic push
468 #pragma clang diagnostic ignored "-Wdeprecated-declarations"
469   [GRPCCall closeOpenConnections];
470 #pragma clang diagnostic pop
471
472   _service = [[self class] host] ? [RMTTestService serviceWithHost:[[self class] host]] : nil;
473 }
474
475 - (void)testEmptyUnaryRPC {
476   XCTAssertNotNil([[self class] host]);
477   __weak XCTestExpectation *expectation = [self expectationWithDescription:@"EmptyUnary"];
478
479   GPBEmpty *request = [GPBEmpty message];
480
481   [_service emptyCallWithRequest:request
482                          handler:^(GPBEmpty *response, NSError *error) {
483                            XCTAssertNil(error, @"Finished with unexpected error: %@", error);
484
485                            id expectedResponse = [GPBEmpty message];
486                            XCTAssertEqualObjects(response, expectedResponse);
487
488                            [expectation fulfill];
489                          }];
490
491   [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
492 }
493
494 - (void)testEmptyUnaryRPCWithV2API {
495   XCTAssertNotNil([[self class] host]);
496   __weak XCTestExpectation *expectReceive =
497       [self expectationWithDescription:@"EmptyUnaryWithV2API received message"];
498   __weak XCTestExpectation *expectComplete =
499       [self expectationWithDescription:@"EmptyUnaryWithV2API completed"];
500
501   GPBEmpty *request = [GPBEmpty message];
502   GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
503   // For backwards compatibility
504   options.transportType = [[self class] transportType];
505   options.transport = [[self class] transport];
506   options.PEMRootCertificates = [[self class] PEMRootCertificates];
507   options.hostNameOverride = [[self class] hostNameOverride];
508
509   GRPCUnaryProtoCall *call = [_service
510       emptyCallWithMessage:request
511            responseHandler:[[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil
512                                messageCallback:^(id message) {
513                                  if (message) {
514                                    id expectedResponse = [GPBEmpty message];
515                                    XCTAssertEqualObjects(message, expectedResponse);
516                                    [expectReceive fulfill];
517                                  }
518                                }
519                                closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
520                                  XCTAssertNil(error, @"Unexpected error: %@", error);
521                                  [expectComplete fulfill];
522                                }]
523                callOptions:options];
524   [call start];
525   [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
526 }
527
528 // Test that responses can be dispatched even if we do not run main run-loop
529 - (void)testAsyncDispatchWithV2API {
530   XCTAssertNotNil([[self class] host]);
531
532   GPBEmpty *request = [GPBEmpty message];
533   GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
534   // For backwards compatibility
535   options.transportType = [[self class] transportType];
536   options.transport = [[self class] transport];
537   options.PEMRootCertificates = [[self class] PEMRootCertificates];
538   options.hostNameOverride = [[self class] hostNameOverride];
539
540   __block BOOL messageReceived = NO;
541   __block BOOL done = NO;
542   NSCondition *cond = [[NSCondition alloc] init];
543   GRPCUnaryProtoCall *call = [_service
544       emptyCallWithMessage:request
545            responseHandler:[[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil
546                                messageCallback:^(id message) {
547                                  if (message) {
548                                    id expectedResponse = [GPBEmpty message];
549                                    XCTAssertEqualObjects(message, expectedResponse);
550                                    [cond lock];
551                                    messageReceived = YES;
552                                    [cond unlock];
553                                  }
554                                }
555                                closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
556                                  XCTAssertNil(error, @"Unexpected error: %@", error);
557                                  [cond lock];
558                                  done = YES;
559                                  [cond signal];
560                                  [cond unlock];
561                                }]
562                callOptions:options];
563
564   NSDate *deadline = [NSDate dateWithTimeIntervalSinceNow:TEST_TIMEOUT];
565   [call start];
566
567   [cond lock];
568   while (!done && [deadline timeIntervalSinceNow] > 0) {
569     [cond waitUntilDate:deadline];
570   }
571   XCTAssertTrue(messageReceived);
572   XCTAssertTrue(done);
573   [cond unlock];
574 }
575
576 - (void)testLargeUnaryRPC {
577   XCTAssertNotNil([[self class] host]);
578   __weak XCTestExpectation *expectation = [self expectationWithDescription:@"LargeUnary"];
579
580   RMTSimpleRequest *request = [RMTSimpleRequest message];
581   request.responseType = RMTPayloadType_Compressable;
582   request.responseSize = 314159;
583   request.payload.body = [NSMutableData dataWithLength:271828];
584
585   [_service unaryCallWithRequest:request
586                          handler:^(RMTSimpleResponse *response, NSError *error) {
587                            XCTAssertNil(error, @"Finished with unexpected error: %@", error);
588
589                            RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message];
590                            expectedResponse.payload.type = RMTPayloadType_Compressable;
591                            expectedResponse.payload.body = [NSMutableData dataWithLength:314159];
592                            XCTAssertEqualObjects(response, expectedResponse);
593
594                            [expectation fulfill];
595                          }];
596
597   [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
598 }
599
600 - (void)testUnaryResponseHandler {
601   XCTAssertNotNil([[self class] host]);
602   // The test does not work on a remote server since it does not echo a trailer
603   if ([[self class] isRemoteTest]) return;
604   XCTestExpectation *expectComplete = [self expectationWithDescription:@"call complete"];
605   XCTestExpectation *expectCompleteMainQueue =
606       [self expectationWithDescription:@"main queue call complete"];
607
608   RMTSimpleRequest *request = [RMTSimpleRequest message];
609   request.responseType = RMTPayloadType_Compressable;
610   request.responseSize = 314159;
611   request.payload.body = [NSMutableData dataWithLength:271828];
612
613   GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
614   // For backwards compatibility
615   options.transportType = [[self class] transportType];
616   options.transport = [[self class] transport];
617   options.PEMRootCertificates = [[self class] PEMRootCertificates];
618   options.hostNameOverride = [[self class] hostNameOverride];
619   const unsigned char raw_bytes[] = {1, 2, 3, 4};
620   NSData *trailer_data = [NSData dataWithBytes:raw_bytes length:sizeof(raw_bytes)];
621   options.initialMetadata = @{
622     @"x-grpc-test-echo-trailing-bin" : trailer_data,
623     @"x-grpc-test-echo-initial" : @"test-header"
624   };
625
626   __block GRPCUnaryResponseHandler *handler = [[GRPCUnaryResponseHandler alloc]
627       initWithResponseHandler:^(GPBMessage *response, NSError *error) {
628         XCTAssertNil(error, @"Unexpected error: %@", error);
629         RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message];
630         expectedResponse.payload.type = RMTPayloadType_Compressable;
631         expectedResponse.payload.body = [NSMutableData dataWithLength:314159];
632         XCTAssertEqualObjects(response, expectedResponse);
633         XCTAssertEqualObjects(handler.responseHeaders[@"x-grpc-test-echo-initial"], @"test-header");
634         XCTAssertEqualObjects(handler.responseTrailers[@"x-grpc-test-echo-trailing-bin"],
635                               trailer_data);
636         [expectComplete fulfill];
637       }
638         responseDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL)];
639   __block GRPCUnaryResponseHandler *handlerMainQueue = [[GRPCUnaryResponseHandler alloc]
640       initWithResponseHandler:^(GPBMessage *response, NSError *error) {
641         XCTAssertNil(error, @"Unexpected error: %@", error);
642         RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message];
643         expectedResponse.payload.type = RMTPayloadType_Compressable;
644         expectedResponse.payload.body = [NSMutableData dataWithLength:314159];
645         XCTAssertEqualObjects(response, expectedResponse);
646         XCTAssertEqualObjects(handlerMainQueue.responseHeaders[@"x-grpc-test-echo-initial"],
647                               @"test-header");
648         XCTAssertEqualObjects(handlerMainQueue.responseTrailers[@"x-grpc-test-echo-trailing-bin"],
649                               trailer_data);
650         [expectCompleteMainQueue fulfill];
651       }
652         responseDispatchQueue:nil];
653
654   [[_service unaryCallWithMessage:request responseHandler:handler callOptions:options] start];
655   [[_service unaryCallWithMessage:request responseHandler:handlerMainQueue
656                       callOptions:options] start];
657   [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
658 }
659
660 - (void)testLargeUnaryRPCWithV2API {
661   XCTAssertNotNil([[self class] host]);
662   __weak XCTestExpectation *expectReceive =
663       [self expectationWithDescription:@"LargeUnaryWithV2API received message"];
664   __weak XCTestExpectation *expectComplete =
665       [self expectationWithDescription:@"LargeUnaryWithV2API received complete"];
666
667   RMTSimpleRequest *request = [RMTSimpleRequest message];
668   request.responseType = RMTPayloadType_Compressable;
669   request.responseSize = 314159;
670   request.payload.body = [NSMutableData dataWithLength:271828];
671
672   GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
673   // For backwards compatibility
674   options.transportType = [[self class] transportType];
675   options.transport = [[self class] transport];
676   options.PEMRootCertificates = [[self class] PEMRootCertificates];
677   options.hostNameOverride = [[self class] hostNameOverride];
678
679   GRPCUnaryProtoCall *call = [_service
680       unaryCallWithMessage:request
681            responseHandler:[[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil
682                                messageCallback:^(id message) {
683                                  XCTAssertNotNil(message);
684                                  if (message) {
685                                    RMTSimpleResponse *expectedResponse =
686                                        [RMTSimpleResponse message];
687                                    expectedResponse.payload.type = RMTPayloadType_Compressable;
688                                    expectedResponse.payload.body =
689                                        [NSMutableData dataWithLength:314159];
690                                    XCTAssertEqualObjects(message, expectedResponse);
691
692                                    [expectReceive fulfill];
693                                  }
694                                }
695                                closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
696                                  XCTAssertNil(error, @"Unexpected error: %@", error);
697                                  [expectComplete fulfill];
698                                }]
699                callOptions:options];
700   [call start];
701   [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
702 }
703
704 - (void)testConcurrentRPCsWithErrorsWithV2API {
705   NSMutableArray *completeExpectations = [NSMutableArray array];
706   NSMutableArray *calls = [NSMutableArray array];
707   int num_rpcs = 10;
708   for (int i = 0; i < num_rpcs; ++i) {
709     [completeExpectations
710         addObject:[self expectationWithDescription:
711                             [NSString stringWithFormat:@"Received trailer for RPC %d", i]]];
712
713     RMTSimpleRequest *request = [RMTSimpleRequest message];
714     request.responseType = RMTPayloadType_Compressable;
715     request.responseSize = 314159;
716     request.payload.body = [NSMutableData dataWithLength:271828];
717     if (i % 3 == 0) {
718       request.responseStatus.code = GRPC_STATUS_UNAVAILABLE;
719     } else if (i % 7 == 0) {
720       request.responseStatus.code = GRPC_STATUS_CANCELLED;
721     }
722     GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
723     // For backwards compatibility
724     options.transportType = [[self class] transportType];
725     options.transport = [[self class] transport];
726     options.PEMRootCertificates = [[self class] PEMRootCertificates];
727     options.hostNameOverride = [[self class] hostNameOverride];
728
729     GRPCUnaryProtoCall *call = [_service
730         unaryCallWithMessage:request
731              responseHandler:[[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil
732                                  messageCallback:^(id message) {
733                                    if (message) {
734                                      RMTSimpleResponse *expectedResponse =
735                                          [RMTSimpleResponse message];
736                                      expectedResponse.payload.type = RMTPayloadType_Compressable;
737                                      expectedResponse.payload.body =
738                                          [NSMutableData dataWithLength:314159];
739                                      XCTAssertEqualObjects(message, expectedResponse);
740                                    }
741                                  }
742                                  closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
743                                    [completeExpectations[i] fulfill];
744                                  }]
745                  callOptions:options];
746     [calls addObject:call];
747   }
748
749   for (int i = 0; i < num_rpcs; ++i) {
750     GRPCUnaryProtoCall *call = calls[i];
751     [call start];
752   }
753   [self waitForExpectationsWithTimeout:STREAMING_CALL_TEST_TIMEOUT handler:nil];
754 }
755
756 - (void)concurrentRPCsWithErrors {
757   const int kNumRpcs = 10;
758   __block int completedCallCount = 0;
759   NSCondition *cv = [[NSCondition alloc] init];
760   NSDate *waitUntil = [[NSDate date] dateByAddingTimeInterval:TEST_TIMEOUT];
761   [cv lock];
762   for (int i = 0; i < kNumRpcs; ++i) {
763     RMTSimpleRequest *request = [RMTSimpleRequest message];
764     request.responseType = RMTPayloadType_Compressable;
765     request.responseSize = 314159;
766     request.payload.body = [NSMutableData dataWithLength:271828];
767     if (i % 3 == 0) {
768       request.responseStatus.code = GRPC_STATUS_UNAVAILABLE;
769     } else if (i % 7 == 0) {
770       request.responseStatus.code = GRPC_STATUS_CANCELLED;
771     }
772
773     GRPCProtoCall *call = [_service
774         RPCToUnaryCallWithRequest:request
775                           handler:^(RMTSimpleResponse *response, NSError *error) {
776                             if (error == nil) {
777                               RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message];
778                               expectedResponse.payload.type = RMTPayloadType_Compressable;
779                               expectedResponse.payload.body = [NSMutableData dataWithLength:314159];
780                               XCTAssertEqualObjects(response, expectedResponse);
781                             }
782                             // DEBUG
783                             [cv lock];
784                             if (++completedCallCount == kNumRpcs) {
785                               [cv signal];
786                             }
787                             [cv unlock];
788                           }];
789     [call setResponseDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL)];
790     [call start];
791   }
792   while (completedCallCount<kNumRpcs && [waitUntil timeIntervalSinceNow]> 0) {
793     [cv waitUntilDate:waitUntil];
794   }
795   [cv unlock];
796 }
797
798 - (void)testConcurrentRPCsWithErrors {
799   [self retriableTest:@selector(concurrentRPCsWithErrors) retries:kTestRetries timeout:10];
800 }
801
802 - (void)testPacketCoalescing {
803   XCTAssertNotNil([[self class] host]);
804   __weak XCTestExpectation *expectation = [self expectationWithDescription:@"LargeUnary"];
805
806   RMTSimpleRequest *request = [RMTSimpleRequest message];
807   request.responseType = RMTPayloadType_Compressable;
808   request.responseSize = 10;
809   request.payload.body = [NSMutableData dataWithLength:10];
810
811   [GRPCCall enableOpBatchLog:YES];
812   [_service unaryCallWithRequest:request
813                          handler:^(RMTSimpleResponse *response, NSError *error) {
814                            XCTAssertNil(error, @"Finished with unexpected error: %@", error);
815
816                            RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message];
817                            expectedResponse.payload.type = RMTPayloadType_Compressable;
818                            expectedResponse.payload.body = [NSMutableData dataWithLength:10];
819                            XCTAssertEqualObjects(response, expectedResponse);
820
821                            // The test is a success if there is a batch of exactly 3 ops
822                            // (SEND_INITIAL_METADATA, SEND_MESSAGE, SEND_CLOSE_FROM_CLIENT). Without
823                            // packet coalescing each batch of ops contains only one op.
824                            NSArray *opBatches = [GRPCCall obtainAndCleanOpBatchLog];
825                            const NSInteger kExpectedOpBatchSize = 3;
826                            for (NSObject *o in opBatches) {
827                              if ([o isKindOfClass:[NSArray class]]) {
828                                NSArray *batch = (NSArray *)o;
829                                if ([batch count] == kExpectedOpBatchSize) {
830                                  [expectation fulfill];
831                                  break;
832                                }
833                              }
834                            }
835                          }];
836
837   [self waitForExpectationsWithTimeout:16 handler:nil];
838   [GRPCCall enableOpBatchLog:NO];
839 }
840
841 - (void)test4MBResponsesAreAccepted {
842   XCTAssertNotNil([[self class] host]);
843   __weak XCTestExpectation *expectation = [self expectationWithDescription:@"MaxResponseSize"];
844
845   RMTSimpleRequest *request = [RMTSimpleRequest message];
846   const int32_t kPayloadSize = 4 * 1024 * 1024 - self.encodingOverhead;  // 4MB - encoding overhead
847   request.responseSize = kPayloadSize;
848
849   [_service unaryCallWithRequest:request
850                          handler:^(RMTSimpleResponse *response, NSError *error) {
851                            XCTAssertNil(error, @"Finished with unexpected error: %@", error);
852                            XCTAssertEqual(response.payload.body.length, kPayloadSize);
853                            [expectation fulfill];
854                          }];
855
856   [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
857 }
858
859 - (void)testResponsesOverMaxSizeFailWithActionableMessage {
860   XCTAssertNotNil([[self class] host]);
861   __weak XCTestExpectation *expectation = [self expectationWithDescription:@"ResponseOverMaxSize"];
862
863   RMTSimpleRequest *request = [RMTSimpleRequest message];
864   const int32_t kPayloadSize = 4 * 1024 * 1024 - self.encodingOverhead + 1;  // 1B over max size
865   request.responseSize = kPayloadSize;
866
867   [_service unaryCallWithRequest:request
868                          handler:^(RMTSimpleResponse *response, NSError *error) {
869                            // TODO(jcanizales): Catch the error and rethrow it with an actionable
870                            // message:
871                            // - Use +[GRPCCall setResponseSizeLimit:forHost:] to set a higher limit.
872                            // - If you're developing the server, consider using response streaming,
873                            // or let clients filter
874                            //   responses by setting a google.protobuf.FieldMask in the request:
875                            //   https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/field_mask.proto
876                            XCTAssertEqualObjects(
877                                error.localizedDescription,
878                                @"Received message larger than max (4194305 vs. 4194304)");
879                            [expectation fulfill];
880                          }];
881
882   [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
883 }
884
885 - (void)testResponsesOver4MBAreAcceptedIfOptedIn {
886   XCTAssertNotNil([[self class] host]);
887   __weak XCTestExpectation *expectation =
888       [self expectationWithDescription:@"HigherResponseSizeLimit"];
889   __block NSError *callError = nil;
890
891   RMTSimpleRequest *request = [RMTSimpleRequest message];
892   const size_t kPayloadSize = 5 * 1024 * 1024;  // 5MB
893   request.responseSize = kPayloadSize;
894
895   [GRPCCall setResponseSizeLimit:6 * 1024 * 1024 forHost:[[self class] host]];
896   [_service unaryCallWithRequest:request
897                          handler:^(RMTSimpleResponse *response, NSError *error) {
898                            callError = error;
899                            [expectation fulfill];
900                          }];
901
902   [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
903   XCTAssertNil(callError, @"Finished with unexpected error: %@", callError);
904 }
905
906 - (void)testClientStreamingRPC {
907   XCTAssertNotNil([[self class] host]);
908   __weak XCTestExpectation *expectation = [self expectationWithDescription:@"ClientStreaming"];
909
910   RMTStreamingInputCallRequest *request1 = [RMTStreamingInputCallRequest message];
911   request1.payload.body = [NSMutableData dataWithLength:27182];
912
913   RMTStreamingInputCallRequest *request2 = [RMTStreamingInputCallRequest message];
914   request2.payload.body = [NSMutableData dataWithLength:8];
915
916   RMTStreamingInputCallRequest *request3 = [RMTStreamingInputCallRequest message];
917   request3.payload.body = [NSMutableData dataWithLength:1828];
918
919   RMTStreamingInputCallRequest *request4 = [RMTStreamingInputCallRequest message];
920   request4.payload.body = [NSMutableData dataWithLength:45904];
921
922   GRXWriter *writer = [GRXWriter writerWithContainer:@[ request1, request2, request3, request4 ]];
923
924   [_service streamingInputCallWithRequestsWriter:writer
925                                          handler:^(RMTStreamingInputCallResponse *response,
926                                                    NSError *error) {
927                                            XCTAssertNil(
928                                                error, @"Finished with unexpected error: %@", error);
929
930                                            RMTStreamingInputCallResponse *expectedResponse =
931                                                [RMTStreamingInputCallResponse message];
932                                            expectedResponse.aggregatedPayloadSize = 74922;
933                                            XCTAssertEqualObjects(response, expectedResponse);
934
935                                            [expectation fulfill];
936                                          }];
937
938   [self waitForExpectationsWithTimeout:STREAMING_CALL_TEST_TIMEOUT handler:nil];
939 }
940
941 - (void)testServerStreamingRPC {
942   XCTAssertNotNil([[self class] host]);
943   __weak XCTestExpectation *expectation = [self expectationWithDescription:@"ServerStreaming"];
944
945   NSArray *expectedSizes = @[ @31415, @9, @2653, @58979 ];
946
947   RMTStreamingOutputCallRequest *request = [RMTStreamingOutputCallRequest message];
948   for (NSNumber *size in expectedSizes) {
949     RMTResponseParameters *parameters = [RMTResponseParameters message];
950     parameters.size = [size intValue];
951     [request.responseParametersArray addObject:parameters];
952   }
953
954   __block int index = 0;
955   [_service
956       streamingOutputCallWithRequest:request
957                         eventHandler:^(BOOL done, RMTStreamingOutputCallResponse *response,
958                                        NSError *error) {
959                           XCTAssertNil(error, @"Finished with unexpected error: %@", error);
960                           XCTAssertTrue(done || response,
961                                         @"Event handler called without an event.");
962
963                           if (response) {
964                             XCTAssertLessThan(index, 4, @"More than 4 responses received.");
965                             id expected = [RMTStreamingOutputCallResponse
966                                 messageWithPayloadSize:expectedSizes[index]];
967                             XCTAssertEqualObjects(response, expected);
968                             index += 1;
969                           }
970
971                           if (done) {
972                             XCTAssertEqual(index, 4, @"Received %i responses instead of 4.", index);
973                             [expectation fulfill];
974                           }
975                         }];
976
977   [self waitForExpectationsWithTimeout:STREAMING_CALL_TEST_TIMEOUT handler:nil];
978 }
979
980 - (void)testPingPongRPC {
981   XCTAssertNotNil([[self class] host]);
982   __weak XCTestExpectation *expectation = [self expectationWithDescription:@"PingPong"];
983
984   NSArray *requests = @[ @27182, @8, @1828, @45904 ];
985   NSArray *responses = @[ @31415, @9, @2653, @58979 ];
986
987   GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init];
988
989   __block int index = 0;
990
991   id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
992                                                requestedResponseSize:responses[index]];
993   [requestsBuffer writeValue:request];
994
995   [_service fullDuplexCallWithRequestsWriter:requestsBuffer
996                                 eventHandler:^(BOOL done, RMTStreamingOutputCallResponse *response,
997                                                NSError *error) {
998                                   XCTAssertNil(error, @"Finished with unexpected error: %@", error);
999                                   XCTAssertTrue(done || response,
1000                                                 @"Event handler called without an event.");
1001
1002                                   if (response) {
1003                                     XCTAssertLessThan(index, 4, @"More than 4 responses received.");
1004                                     id expected = [RMTStreamingOutputCallResponse
1005                                         messageWithPayloadSize:responses[index]];
1006                                     XCTAssertEqualObjects(response, expected);
1007                                     index += 1;
1008                                     if (index < 4) {
1009                                       id request = [RMTStreamingOutputCallRequest
1010                                           messageWithPayloadSize:requests[index]
1011                                            requestedResponseSize:responses[index]];
1012                                       [requestsBuffer writeValue:request];
1013                                     } else {
1014                                       [requestsBuffer writesFinishedWithError:nil];
1015                                     }
1016                                   }
1017
1018                                   if (done) {
1019                                     XCTAssertEqual(index, 4, @"Received %i responses instead of 4.",
1020                                                    index);
1021                                     [expectation fulfill];
1022                                   }
1023                                 }];
1024   [self waitForExpectationsWithTimeout:STREAMING_CALL_TEST_TIMEOUT handler:nil];
1025 }
1026
1027 - (void)testPingPongRPCWithV2API {
1028   XCTAssertNotNil([[self class] host]);
1029   __weak XCTestExpectation *expectation = [self expectationWithDescription:@"PingPongWithV2API"];
1030
1031   NSArray *requests = @[ @27182, @8, @1828, @45904 ];
1032   NSArray *responses = @[ @31415, @9, @2653, @58979 ];
1033
1034   __block int index = 0;
1035
1036   id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
1037                                                requestedResponseSize:responses[index]];
1038   GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
1039   // For backwards compatibility
1040   options.transportType = [[self class] transportType];
1041   options.transport = [[self class] transport];
1042   options.PEMRootCertificates = [[self class] PEMRootCertificates];
1043   options.hostNameOverride = [[self class] hostNameOverride];
1044
1045   __block GRPCStreamingProtoCall *call = [_service
1046       fullDuplexCallWithResponseHandler:[[InteropTestsBlockCallbacks alloc]
1047                                             initWithInitialMetadataCallback:nil
1048                                             messageCallback:^(id message) {
1049                                               XCTAssertLessThan(index, 4,
1050                                                                 @"More than 4 responses received.");
1051                                               id expected = [RMTStreamingOutputCallResponse
1052                                                   messageWithPayloadSize:responses[index]];
1053                                               XCTAssertEqualObjects(message, expected);
1054                                               index += 1;
1055                                               if (index < 4) {
1056                                                 id request = [RMTStreamingOutputCallRequest
1057                                                     messageWithPayloadSize:requests[index]
1058                                                      requestedResponseSize:responses[index]];
1059                                                 [call writeMessage:request];
1060                                               } else {
1061                                                 [call finish];
1062                                               }
1063                                             }
1064                                             closeCallback:^(NSDictionary *trailingMetadata,
1065                                                             NSError *error) {
1066                                               XCTAssertNil(error,
1067                                                            @"Finished with unexpected error: %@",
1068                                                            error);
1069                                               XCTAssertEqual(index, 4,
1070                                                              @"Received %i responses instead of 4.",
1071                                                              index);
1072                                               [expectation fulfill];
1073                                             }]
1074                             callOptions:options];
1075   [call start];
1076   [call writeMessage:request];
1077
1078   [self waitForExpectationsWithTimeout:STREAMING_CALL_TEST_TIMEOUT handler:nil];
1079 }
1080
1081 - (void)testPingPongRPCWithFlowControl {
1082   XCTAssertNotNil([[self class] host]);
1083   __weak XCTestExpectation *expectation = [self expectationWithDescription:@"PingPongWithV2API"];
1084
1085   NSArray *requests = @[ @27182, @8, @1828, @45904 ];
1086   NSArray *responses = @[ @31415, @9, @2653, @58979 ];
1087
1088   __block int index = 0;
1089
1090   id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
1091                                                requestedResponseSize:responses[index]];
1092   GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
1093   // For backwards compatibility
1094   options.transportType = [[self class] transportType];
1095   options.transport = [[self class] transport];
1096   options.PEMRootCertificates = [[self class] PEMRootCertificates];
1097   options.hostNameOverride = [[self class] hostNameOverride];
1098   options.flowControlEnabled = YES;
1099   __block int writeMessageCount = 0;
1100
1101   __block GRPCStreamingProtoCall *call = [_service
1102       fullDuplexCallWithResponseHandler:[[InteropTestsBlockCallbacks alloc]
1103                                             initWithInitialMetadataCallback:nil
1104                                             messageCallback:^(id message) {
1105                                               XCTAssertLessThan(index, 4,
1106                                                                 @"More than 4 responses received.");
1107                                               id expected = [RMTStreamingOutputCallResponse
1108                                                   messageWithPayloadSize:responses[index]];
1109                                               XCTAssertEqualObjects(message, expected);
1110                                               index += 1;
1111                                               if (index < 4) {
1112                                                 id request = [RMTStreamingOutputCallRequest
1113                                                     messageWithPayloadSize:requests[index]
1114                                                      requestedResponseSize:responses[index]];
1115                                                 XCTAssertLessThanOrEqual(
1116                                                     index, writeMessageCount,
1117                                                     @"Message received before writing message.");
1118                                                 [call writeMessage:request];
1119                                                 [call receiveNextMessage];
1120                                               } else {
1121                                                 [call finish];
1122                                               }
1123                                             }
1124                                             closeCallback:^(NSDictionary *trailingMetadata,
1125                                                             NSError *error) {
1126                                               XCTAssertNil(error,
1127                                                            @"Finished with unexpected error: %@",
1128                                                            error);
1129                                               XCTAssertEqual(index, 4,
1130                                                              @"Received %i responses instead of 4.",
1131                                                              index);
1132                                               [expectation fulfill];
1133                                             }
1134                                             writeMessageCallback:^{
1135                                               writeMessageCount++;
1136                                             }]
1137                             callOptions:options];
1138   [call start];
1139   [call receiveNextMessage];
1140   [call writeMessage:request];
1141
1142   [self waitForExpectationsWithTimeout:STREAMING_CALL_TEST_TIMEOUT handler:nil];
1143 }
1144
1145 - (void)testEmptyStreamRPC {
1146   XCTAssertNotNil([[self class] host]);
1147   __weak XCTestExpectation *expectation = [self expectationWithDescription:@"EmptyStream"];
1148   [_service fullDuplexCallWithRequestsWriter:[GRXWriter emptyWriter]
1149                                 eventHandler:^(BOOL done, RMTStreamingOutputCallResponse *response,
1150                                                NSError *error) {
1151                                   XCTAssertNil(error, @"Finished with unexpected error: %@", error);
1152                                   XCTAssert(done, @"Unexpected response: %@", response);
1153                                   [expectation fulfill];
1154                                 }];
1155   [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
1156 }
1157
1158 - (void)testCancelAfterBeginRPC {
1159   XCTAssertNotNil([[self class] host]);
1160   __weak XCTestExpectation *expectation = [self expectationWithDescription:@"CancelAfterBegin"];
1161
1162   // A buffered pipe to which we never write any value acts as a writer that just hangs.
1163   GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init];
1164
1165   GRPCProtoCall *call = [_service
1166       RPCToStreamingInputCallWithRequestsWriter:requestsBuffer
1167                                         handler:^(RMTStreamingInputCallResponse *response,
1168                                                   NSError *error) {
1169                                           XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED);
1170                                           [expectation fulfill];
1171                                         }];
1172   XCTAssertEqual(call.state, GRXWriterStateNotStarted);
1173
1174   [call start];
1175   XCTAssertEqual(call.state, GRXWriterStateStarted);
1176
1177   [call cancel];
1178   XCTAssertEqual(call.state, GRXWriterStateFinished);
1179
1180   [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
1181 }
1182
1183 - (void)testCancelAfterBeginRPCWithV2API {
1184   XCTAssertNotNil([[self class] host]);
1185   __weak XCTestExpectation *expectation =
1186       [self expectationWithDescription:@"CancelAfterBeginWithV2API"];
1187
1188   // A buffered pipe to which we never write any value acts as a writer that just hangs.
1189   __block GRPCStreamingProtoCall *call = [_service
1190       streamingInputCallWithResponseHandler:[[InteropTestsBlockCallbacks alloc]
1191                                                 initWithInitialMetadataCallback:nil
1192                                                 messageCallback:^(id message) {
1193                                                   XCTFail(@"Not expected to receive message");
1194                                                 }
1195                                                 closeCallback:^(NSDictionary *trailingMetadata,
1196                                                                 NSError *error) {
1197                                                   XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED);
1198                                                   [expectation fulfill];
1199                                                 }]
1200                                 callOptions:nil];
1201   [call start];
1202   [call cancel];
1203
1204   [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
1205 }
1206
1207 - (void)testCancelAfterFirstResponseRPC {
1208   XCTAssertNotNil([[self class] host]);
1209   __weak XCTestExpectation *expectation =
1210       [self expectationWithDescription:@"CancelAfterFirstResponse"];
1211
1212   // A buffered pipe to which we write a single value but never close
1213   GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init];
1214
1215   __block BOOL receivedResponse = NO;
1216
1217   id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:@21782
1218                                                requestedResponseSize:@31415];
1219
1220   [requestsBuffer writeValue:request];
1221
1222   __block GRPCProtoCall *call = [_service
1223       RPCToFullDuplexCallWithRequestsWriter:requestsBuffer
1224                                eventHandler:^(BOOL done, RMTStreamingOutputCallResponse *response,
1225                                               NSError *error) {
1226                                  if (receivedResponse) {
1227                                    XCTAssert(done, @"Unexpected extra response %@", response);
1228                                    XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED);
1229                                    [expectation fulfill];
1230                                  } else {
1231                                    XCTAssertNil(error, @"Finished with unexpected error: %@",
1232                                                 error);
1233                                    XCTAssertFalse(done, @"Finished without response");
1234                                    XCTAssertNotNil(response);
1235                                    receivedResponse = YES;
1236                                    [call cancel];
1237                                  }
1238                                }];
1239   [call start];
1240   [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
1241 }
1242
1243 - (void)testCancelAfterFirstResponseRPCWithV2API {
1244   XCTAssertNotNil([[self class] host]);
1245   __weak XCTestExpectation *completionExpectation =
1246       [self expectationWithDescription:@"Call completed."];
1247   __weak XCTestExpectation *responseExpectation =
1248       [self expectationWithDescription:@"Received response."];
1249
1250   __block BOOL receivedResponse = NO;
1251
1252   GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
1253   // For backwards compatibility
1254   options.transportType = self.class.transportType;
1255   options.transport = [[self class] transport];
1256   options.PEMRootCertificates = self.class.PEMRootCertificates;
1257   options.hostNameOverride = [[self class] hostNameOverride];
1258
1259   id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:@21782
1260                                                requestedResponseSize:@31415];
1261
1262   __block GRPCStreamingProtoCall *call = [_service
1263       fullDuplexCallWithResponseHandler:[[InteropTestsBlockCallbacks alloc]
1264                                             initWithInitialMetadataCallback:nil
1265                                             messageCallback:^(id message) {
1266                                               XCTAssertFalse(receivedResponse);
1267                                               receivedResponse = YES;
1268                                               [call cancel];
1269                                               [responseExpectation fulfill];
1270                                             }
1271                                             closeCallback:^(NSDictionary *trailingMetadata,
1272                                                             NSError *error) {
1273                                               XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED);
1274                                               [completionExpectation fulfill];
1275                                             }]
1276                             callOptions:options];
1277   [call start];
1278   [call writeMessage:request];
1279   [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
1280 }
1281
1282 - (void)testCancelAfterFirstRequestWithV2API {
1283   XCTAssertNotNil([[self class] host]);
1284   __weak XCTestExpectation *completionExpectation =
1285       [self expectationWithDescription:@"Call completed."];
1286
1287   GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
1288   // For backwards compatibility
1289   options.transportType = self.class.transportType;
1290   options.transport = [[self class] transport];
1291   options.PEMRootCertificates = self.class.PEMRootCertificates;
1292   options.hostNameOverride = [[self class] hostNameOverride];
1293
1294   id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:@21782
1295                                                requestedResponseSize:@31415];
1296
1297   __block GRPCStreamingProtoCall *call = [_service
1298       fullDuplexCallWithResponseHandler:[[InteropTestsBlockCallbacks alloc]
1299                                             initWithInitialMetadataCallback:nil
1300                                             messageCallback:^(id message) {
1301                                               XCTFail(@"Received unexpected response.");
1302                                             }
1303                                             closeCallback:^(NSDictionary *trailingMetadata,
1304                                                             NSError *error) {
1305                                               XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED);
1306                                               [completionExpectation fulfill];
1307                                             }]
1308                             callOptions:options];
1309   [call start];
1310   [call writeMessage:request];
1311   [call cancel];
1312   [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
1313 }
1314
1315 - (void)testRPCAfterClosingOpenConnections {
1316   XCTAssertNotNil([[self class] host]);
1317   __weak XCTestExpectation *expectation =
1318       [self expectationWithDescription:@"RPC after closing connection"];
1319
1320   GPBEmpty *request = [GPBEmpty message];
1321
1322   [_service
1323       emptyCallWithRequest:request
1324                    handler:^(GPBEmpty *response, NSError *error) {
1325                      XCTAssertNil(error, @"First RPC finished with unexpected error: %@", error);
1326
1327 #pragma clang diagnostic push
1328 #pragma clang diagnostic ignored "-Wdeprecated-declarations"
1329                      [GRPCCall closeOpenConnections];
1330 #pragma clang diagnostic pop
1331
1332                      [self->_service
1333                          emptyCallWithRequest:request
1334                                       handler:^(GPBEmpty *response, NSError *error) {
1335                                         XCTAssertNil(
1336                                             error, @"Second RPC finished with unexpected error: %@",
1337                                             error);
1338                                         [expectation fulfill];
1339                                       }];
1340                    }];
1341
1342   [self waitForExpectationsWithTimeout:STREAMING_CALL_TEST_TIMEOUT handler:nil];
1343 }
1344
1345 - (void)testCompressedUnaryRPC {
1346   // This test needs to be disabled for remote test because interop server grpc-test
1347   // does not support compression.
1348   if (isRemoteInteropTest([[self class] host])) {
1349     return;
1350   }
1351   XCTAssertNotNil([[self class] host]);
1352   __weak XCTestExpectation *expectation = [self expectationWithDescription:@"LargeUnary"];
1353
1354   RMTSimpleRequest *request = [RMTSimpleRequest message];
1355   request.responseType = RMTPayloadType_Compressable;
1356   request.responseSize = 314159;
1357   request.payload.body = [NSMutableData dataWithLength:271828];
1358   request.expectCompressed.value = YES;
1359   [GRPCCall setDefaultCompressMethod:GRPCCompressGzip forhost:[[self class] host]];
1360
1361   [_service unaryCallWithRequest:request
1362                          handler:^(RMTSimpleResponse *response, NSError *error) {
1363                            XCTAssertNil(error, @"Finished with unexpected error: %@", error);
1364
1365                            RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message];
1366                            expectedResponse.payload.type = RMTPayloadType_Compressable;
1367                            expectedResponse.payload.body = [NSMutableData dataWithLength:314159];
1368                            XCTAssertEqualObjects(response, expectedResponse);
1369
1370                            [expectation fulfill];
1371                          }];
1372
1373   [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
1374 }
1375
1376 - (void)testKeepaliveWithV2API {
1377   XCTAssertNotNil([[self class] host]);
1378   if ([[self class] transport] == gGRPCCoreCronetID) {
1379     // Cronet does not support keepalive
1380     return;
1381   }
1382   __weak XCTestExpectation *expectation = [self expectationWithDescription:@"Keepalive"];
1383
1384   NSNumber *kRequestSize = @27182;
1385   NSNumber *kResponseSize = @31415;
1386
1387   id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:kRequestSize
1388                                                requestedResponseSize:kResponseSize];
1389   GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
1390   options.transportType = [[self class] transportType];
1391   options.transport = [[self class] transport];
1392   options.PEMRootCertificates = [[self class] PEMRootCertificates];
1393   options.hostNameOverride = [[self class] hostNameOverride];
1394   options.keepaliveInterval = 1.5;
1395   options.keepaliveTimeout = 0;
1396
1397   __block GRPCStreamingProtoCall *call = [_service
1398       fullDuplexCallWithResponseHandler:
1399           [[InteropTestsBlockCallbacks alloc]
1400               initWithInitialMetadataCallback:nil
1401                               messageCallback:nil
1402                                 closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
1403                                   XCTAssertNotNil(error);
1404                                   XCTAssertEqual(
1405                                       error.code, GRPC_STATUS_UNAVAILABLE,
1406                                       @"Received status %ld instead of UNAVAILABLE (14).",
1407                                       error.code);
1408                                   [expectation fulfill];
1409                                 }]
1410                             callOptions:options];
1411   [call writeMessage:request];
1412   [call start];
1413
1414   [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
1415   [call finish];
1416 }
1417
1418 - (void)testDefaultInterceptor {
1419   XCTAssertNotNil([[self class] host]);
1420   __weak XCTestExpectation *expectation =
1421       [self expectationWithDescription:@"testDefaultInterceptor"];
1422
1423   NSArray *requests = @[ @27182, @8, @1828, @45904 ];
1424   NSArray *responses = @[ @31415, @9, @2653, @58979 ];
1425
1426   __block int index = 0;
1427
1428   id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
1429                                                requestedResponseSize:responses[index]];
1430   GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
1431   // For backwards compatibility
1432   options.transportType = [[self class] transportType];
1433   options.transport = [[self class] transport];
1434   options.PEMRootCertificates = [[self class] PEMRootCertificates];
1435   options.hostNameOverride = [[self class] hostNameOverride];
1436   options.interceptorFactories = @[ [[DefaultInterceptorFactory alloc] init] ];
1437
1438   __block GRPCStreamingProtoCall *call = [_service
1439       fullDuplexCallWithResponseHandler:[[InteropTestsBlockCallbacks alloc]
1440                                             initWithInitialMetadataCallback:nil
1441                                             messageCallback:^(id message) {
1442                                               XCTAssertLessThan(index, 4,
1443                                                                 @"More than 4 responses received.");
1444                                               id expected = [RMTStreamingOutputCallResponse
1445                                                   messageWithPayloadSize:responses[index]];
1446                                               XCTAssertEqualObjects(message, expected);
1447                                               index += 1;
1448                                               if (index < 4) {
1449                                                 id request = [RMTStreamingOutputCallRequest
1450                                                     messageWithPayloadSize:requests[index]
1451                                                      requestedResponseSize:responses[index]];
1452                                                 [call writeMessage:request];
1453                                               } else {
1454                                                 [call finish];
1455                                               }
1456                                             }
1457                                             closeCallback:^(NSDictionary *trailingMetadata,
1458                                                             NSError *error) {
1459                                               XCTAssertNil(error,
1460                                                            @"Finished with unexpected error: %@",
1461                                                            error);
1462                                               XCTAssertEqual(index, 4,
1463                                                              @"Received %i responses instead of 4.",
1464                                                              index);
1465                                               [expectation fulfill];
1466                                             }]
1467                             callOptions:options];
1468   [call start];
1469   [call writeMessage:request];
1470
1471   [self waitForExpectationsWithTimeout:STREAMING_CALL_TEST_TIMEOUT handler:nil];
1472 }
1473
1474 - (void)testLoggingInterceptor {
1475   XCTAssertNotNil([[self class] host]);
1476   __weak XCTestExpectation *expectation =
1477       [self expectationWithDescription:@"testLoggingInterceptor"];
1478
1479   __block NSUInteger startCount = 0;
1480   __block NSUInteger writeDataCount = 0;
1481   __block NSUInteger finishCount = 0;
1482   __block NSUInteger receiveNextMessageCount = 0;
1483   __block NSUInteger responseHeaderCount = 0;
1484   __block NSUInteger responseDataCount = 0;
1485   __block NSUInteger responseCloseCount = 0;
1486   __block NSUInteger didWriteDataCount = 0;
1487   id<GRPCInterceptorFactory> factory = [[HookInterceptorFactory alloc]
1488       initWithDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL)
1489       startHook:^(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
1490                   GRPCInterceptorManager *manager) {
1491         startCount++;
1492         XCTAssertEqualObjects(requestOptions.host, [[self class] host]);
1493         XCTAssertEqualObjects(requestOptions.path, @"/grpc.testing.TestService/FullDuplexCall");
1494         XCTAssertEqual(requestOptions.safety, GRPCCallSafetyDefault);
1495         [manager startNextInterceptorWithRequest:[requestOptions copy]
1496                                      callOptions:[callOptions copy]];
1497       }
1498       writeDataHook:^(id data, GRPCInterceptorManager *manager) {
1499         writeDataCount++;
1500         [manager writeNextInterceptorWithData:data];
1501       }
1502       finishHook:^(GRPCInterceptorManager *manager) {
1503         finishCount++;
1504         [manager finishNextInterceptor];
1505       }
1506       receiveNextMessagesHook:^(NSUInteger numberOfMessages, GRPCInterceptorManager *manager) {
1507         receiveNextMessageCount++;
1508         [manager receiveNextInterceptorMessages:numberOfMessages];
1509       }
1510       responseHeaderHook:^(NSDictionary *initialMetadata, GRPCInterceptorManager *manager) {
1511         responseHeaderCount++;
1512         [manager forwardPreviousInterceptorWithInitialMetadata:initialMetadata];
1513       }
1514       responseDataHook:^(id data, GRPCInterceptorManager *manager) {
1515         responseDataCount++;
1516         [manager forwardPreviousInterceptorWithData:data];
1517       }
1518       responseCloseHook:^(NSDictionary *trailingMetadata, NSError *error,
1519                           GRPCInterceptorManager *manager) {
1520         responseCloseCount++;
1521         [manager forwardPreviousInterceptorCloseWithTrailingMetadata:trailingMetadata error:error];
1522       }
1523       didWriteDataHook:^(GRPCInterceptorManager *manager) {
1524         didWriteDataCount++;
1525         [manager forwardPreviousInterceptorDidWriteData];
1526       }];
1527
1528   NSArray *requests = @[ @1, @2, @3, @4 ];
1529   NSArray *responses = @[ @1, @2, @3, @4 ];
1530
1531   __block int messageIndex = 0;
1532
1533   id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[messageIndex]
1534                                                requestedResponseSize:responses[messageIndex]];
1535   GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
1536   // For backwards compatibility
1537   options.transportType = [[self class] transportType];
1538   options.transport = [[self class] transport];
1539   options.PEMRootCertificates = [[self class] PEMRootCertificates];
1540   options.hostNameOverride = [[self class] hostNameOverride];
1541   options.flowControlEnabled = YES;
1542   options.interceptorFactories = @[ factory ];
1543
1544   __block int writeMessageCount = 0;
1545
1546   __block GRPCStreamingProtoCall *call = [_service
1547       fullDuplexCallWithResponseHandler:[[InteropTestsBlockCallbacks alloc]
1548                                             initWithInitialMetadataCallback:nil
1549                                             messageCallback:^(id message) {
1550                                               XCTAssertLessThan(messageIndex, 4,
1551                                                                 @"More than 4 responses received.");
1552                                               id expected = [RMTStreamingOutputCallResponse
1553                                                   messageWithPayloadSize:responses[messageIndex]];
1554                                               XCTAssertEqualObjects(message, expected);
1555                                               messageIndex += 1;
1556                                               if (messageIndex < 4) {
1557                                                 id request = [RMTStreamingOutputCallRequest
1558                                                     messageWithPayloadSize:requests[messageIndex]
1559                                                      requestedResponseSize:responses[messageIndex]];
1560                                                 XCTAssertLessThanOrEqual(
1561                                                     messageIndex, writeMessageCount,
1562                                                     @"Message received before writing message.");
1563                                                 [call writeMessage:request];
1564                                                 [call receiveNextMessage];
1565                                               } else {
1566                                                 [call finish];
1567                                               }
1568                                             }
1569                                             closeCallback:^(NSDictionary *trailingMetadata,
1570                                                             NSError *error) {
1571                                               XCTAssertNil(error,
1572                                                            @"Finished with unexpected error: %@",
1573                                                            error);
1574                                               XCTAssertEqual(messageIndex, 4,
1575                                                              @"Received %i responses instead of 4.",
1576                                                              messageIndex);
1577                                               [expectation fulfill];
1578                                             }
1579                                             writeMessageCallback:^{
1580                                               writeMessageCount++;
1581                                             }]
1582                             callOptions:options];
1583   [call start];
1584   [call receiveNextMessage];
1585   [call writeMessage:request];
1586
1587   [self waitForExpectationsWithTimeout:TEST_TIMEOUT handler:nil];
1588   XCTAssertEqual(writeMessageCount, 4);
1589   XCTAssertEqual(startCount, 1);
1590   XCTAssertEqual(writeDataCount, 4);
1591   XCTAssertEqual(finishCount, 1);
1592   XCTAssertEqual(receiveNextMessageCount, 4);
1593   XCTAssertEqual(responseHeaderCount, 1);
1594   XCTAssertEqual(responseDataCount, 4);
1595   XCTAssertEqual(responseCloseCount, 1);
1596   XCTAssertEqual(didWriteDataCount, 4);
1597 }
1598
1599 // Chain a default interceptor and a hook interceptor which, after one write, cancels the call
1600 // under the hood but forward further data to the user.
1601 - (void)testHijackingInterceptor {
1602   NSUInteger kCancelAfterWrites = 1;
1603   XCTAssertNotNil([[self class] host]);
1604   __weak XCTestExpectation *expectUserCallComplete =
1605       [self expectationWithDescription:@"User call completed."];
1606
1607   NSArray *responses = @[ @1, @2, @3, @4 ];
1608   __block int index = 0;
1609
1610   __block NSUInteger startCount = 0;
1611   __block NSUInteger writeDataCount = 0;
1612   __block NSUInteger finishCount = 0;
1613   __block NSUInteger responseHeaderCount = 0;
1614   __block NSUInteger responseDataCount = 0;
1615   __block NSUInteger responseCloseCount = 0;
1616   id<GRPCInterceptorFactory> factory = [[HookInterceptorFactory alloc]
1617       initWithDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL)
1618       startHook:^(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
1619                   GRPCInterceptorManager *manager) {
1620         startCount++;
1621         [manager startNextInterceptorWithRequest:[requestOptions copy]
1622                                      callOptions:[callOptions copy]];
1623       }
1624       writeDataHook:^(id data, GRPCInterceptorManager *manager) {
1625         writeDataCount++;
1626         if (index < kCancelAfterWrites) {
1627           [manager writeNextInterceptorWithData:data];
1628         } else if (index == kCancelAfterWrites) {
1629           [manager cancelNextInterceptor];
1630           [manager forwardPreviousInterceptorWithData:[[RMTStreamingOutputCallResponse
1631                                                           messageWithPayloadSize:responses[index]]
1632                                                           data]];
1633         } else {  // (index > kCancelAfterWrites)
1634           [manager forwardPreviousInterceptorWithData:[[RMTStreamingOutputCallResponse
1635                                                           messageWithPayloadSize:responses[index]]
1636                                                           data]];
1637         }
1638       }
1639       finishHook:^(GRPCInterceptorManager *manager) {
1640         finishCount++;
1641         // finish must happen after the hijacking, so directly reply with a close
1642         [manager forwardPreviousInterceptorCloseWithTrailingMetadata:@{@"grpc-status" : @"0"}
1643                                                                error:nil];
1644         [manager shutDown];
1645       }
1646       receiveNextMessagesHook:nil
1647       responseHeaderHook:^(NSDictionary *initialMetadata, GRPCInterceptorManager *manager) {
1648         responseHeaderCount++;
1649         [manager forwardPreviousInterceptorWithInitialMetadata:initialMetadata];
1650       }
1651       responseDataHook:^(id data, GRPCInterceptorManager *manager) {
1652         responseDataCount++;
1653         [manager forwardPreviousInterceptorWithData:data];
1654       }
1655       responseCloseHook:^(NSDictionary *trailingMetadata, NSError *error,
1656                           GRPCInterceptorManager *manager) {
1657         responseCloseCount++;
1658         // since we canceled the call, it should return cancel error
1659         XCTAssertNil(trailingMetadata);
1660         XCTAssertNotNil(error);
1661         XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED);
1662       }
1663       didWriteDataHook:nil];
1664
1665   NSArray *requests = @[ @1, @2, @3, @4 ];
1666
1667   id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
1668                                                requestedResponseSize:responses[index]];
1669   GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
1670   // For backwards compatibility
1671   options.transportType = [[self class] transportType];
1672   options.transport = [[self class] transport];
1673   options.PEMRootCertificates = [[self class] PEMRootCertificates];
1674   options.hostNameOverride = [[self class] hostNameOverride];
1675   options.interceptorFactories = @[ [[DefaultInterceptorFactory alloc] init], factory ];
1676
1677   __block GRPCStreamingProtoCall *call = [_service
1678       fullDuplexCallWithResponseHandler:[[InteropTestsBlockCallbacks alloc]
1679                                             initWithInitialMetadataCallback:nil
1680                                             messageCallback:^(id message) {
1681                                               XCTAssertLessThan(index, 4,
1682                                                                 @"More than 4 responses received.");
1683                                               id expected = [RMTStreamingOutputCallResponse
1684                                                   messageWithPayloadSize:responses[index]];
1685                                               XCTAssertEqualObjects(message, expected);
1686                                               index += 1;
1687                                               if (index < 4) {
1688                                                 id request = [RMTStreamingOutputCallRequest
1689                                                     messageWithPayloadSize:requests[index]
1690                                                      requestedResponseSize:responses[index]];
1691                                                 [call writeMessage:request];
1692                                                 [call receiveNextMessage];
1693                                               } else {
1694                                                 [call finish];
1695                                               }
1696                                             }
1697                                             closeCallback:^(NSDictionary *trailingMetadata,
1698                                                             NSError *error) {
1699                                               XCTAssertNil(error,
1700                                                            @"Finished with unexpected error: %@",
1701                                                            error);
1702                                               XCTAssertEqual(index, 4,
1703                                                              @"Received %i responses instead of 4.",
1704                                                              index);
1705                                               [expectUserCallComplete fulfill];
1706                                             }]
1707                             callOptions:options];
1708   [call start];
1709   [call receiveNextMessage];
1710   [call writeMessage:request];
1711
1712   [self waitForExpectationsWithTimeout:STREAMING_CALL_TEST_TIMEOUT handler:nil];
1713   XCTAssertEqual(startCount, 1);
1714   XCTAssertEqual(writeDataCount, 4);
1715   XCTAssertEqual(finishCount, 1);
1716   XCTAssertEqual(responseHeaderCount, 1);
1717   XCTAssertEqual(responseDataCount, 1);
1718   XCTAssertEqual(responseCloseCount, 1);
1719 }
1720
1721 - (void)testGlobalInterceptor {
1722   XCTAssertNotNil([[self class] host]);
1723   __weak XCTestExpectation *expectation =
1724       [self expectationWithDescription:@"testGlobalInterceptor"];
1725
1726   __block NSUInteger startCount = 0;
1727   __block NSUInteger writeDataCount = 0;
1728   __block NSUInteger finishCount = 0;
1729   __block NSUInteger receiveNextMessageCount = 0;
1730   __block NSUInteger responseHeaderCount = 0;
1731   __block NSUInteger responseDataCount = 0;
1732   __block NSUInteger responseCloseCount = 0;
1733   __block NSUInteger didWriteDataCount = 0;
1734   [globalInterceptorFactory
1735       setStartHook:^(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
1736                      GRPCInterceptorManager *manager) {
1737         startCount++;
1738         XCTAssertEqualObjects(requestOptions.host, [[self class] host]);
1739         XCTAssertEqualObjects(requestOptions.path, @"/grpc.testing.TestService/FullDuplexCall");
1740         XCTAssertEqual(requestOptions.safety, GRPCCallSafetyDefault);
1741         [manager startNextInterceptorWithRequest:[requestOptions copy]
1742                                      callOptions:[callOptions copy]];
1743       }
1744       writeDataHook:^(id data, GRPCInterceptorManager *manager) {
1745         writeDataCount++;
1746         [manager writeNextInterceptorWithData:data];
1747       }
1748       finishHook:^(GRPCInterceptorManager *manager) {
1749         finishCount++;
1750         [manager finishNextInterceptor];
1751       }
1752       receiveNextMessagesHook:^(NSUInteger numberOfMessages, GRPCInterceptorManager *manager) {
1753         receiveNextMessageCount++;
1754         [manager receiveNextInterceptorMessages:numberOfMessages];
1755       }
1756       responseHeaderHook:^(NSDictionary *initialMetadata, GRPCInterceptorManager *manager) {
1757         responseHeaderCount++;
1758         [manager forwardPreviousInterceptorWithInitialMetadata:initialMetadata];
1759       }
1760       responseDataHook:^(id data, GRPCInterceptorManager *manager) {
1761         responseDataCount++;
1762         [manager forwardPreviousInterceptorWithData:data];
1763       }
1764       responseCloseHook:^(NSDictionary *trailingMetadata, NSError *error,
1765                           GRPCInterceptorManager *manager) {
1766         responseCloseCount++;
1767         [manager forwardPreviousInterceptorCloseWithTrailingMetadata:trailingMetadata error:error];
1768       }
1769       didWriteDataHook:^(GRPCInterceptorManager *manager) {
1770         didWriteDataCount++;
1771         [manager forwardPreviousInterceptorDidWriteData];
1772       }];
1773
1774   NSArray *requests = @[ @1, @2, @3, @4 ];
1775   NSArray *responses = @[ @1, @2, @3, @4 ];
1776
1777   __block int index = 0;
1778
1779   id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
1780                                                requestedResponseSize:responses[index]];
1781   GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
1782   // For backwards compatibility
1783   options.transportType = [[self class] transportType];
1784   options.transport = [[self class] transport];
1785   options.PEMRootCertificates = [[self class] PEMRootCertificates];
1786   options.hostNameOverride = [[self class] hostNameOverride];
1787   options.flowControlEnabled = YES;
1788   globalInterceptorFactory.enabled = YES;
1789
1790   __block int writeMessageCount = 0;
1791   __block GRPCStreamingProtoCall *call = [_service
1792       fullDuplexCallWithResponseHandler:[[InteropTestsBlockCallbacks alloc]
1793                                             initWithInitialMetadataCallback:nil
1794                                             messageCallback:^(id message) {
1795                                               XCTAssertLessThan(index, 4,
1796                                                                 @"More than 4 responses received.");
1797                                               index += 1;
1798                                               if (index < 4) {
1799                                                 id request = [RMTStreamingOutputCallRequest
1800                                                     messageWithPayloadSize:requests[index]
1801                                                      requestedResponseSize:responses[index]];
1802                                                 XCTAssertLessThanOrEqual(
1803                                                     index, writeMessageCount,
1804                                                     @"Message received before writing message.");
1805                                                 [call writeMessage:request];
1806                                                 [call receiveNextMessage];
1807                                               } else {
1808                                                 [call finish];
1809                                               }
1810                                             }
1811                                             closeCallback:^(NSDictionary *trailingMetadata,
1812                                                             NSError *error) {
1813                                               XCTAssertNil(error,
1814                                                            @"Finished with unexpected error: %@",
1815                                                            error);
1816                                               [expectation fulfill];
1817                                             }
1818                                             writeMessageCallback:^{
1819                                               writeMessageCount++;
1820                                             }]
1821                             callOptions:options];
1822   [call start];
1823   [call receiveNextMessage];
1824   [call writeMessage:request];
1825
1826   [self waitForExpectationsWithTimeout:STREAMING_CALL_TEST_TIMEOUT handler:nil];
1827   XCTAssertEqual(startCount, 1);
1828   XCTAssertEqual(writeDataCount, 4);
1829   XCTAssertEqual(finishCount, 1);
1830   XCTAssertEqual(receiveNextMessageCount, 4);
1831   XCTAssertEqual(responseHeaderCount, 1);
1832   XCTAssertEqual(responseDataCount, 4);
1833   XCTAssertEqual(responseCloseCount, 1);
1834   XCTAssertEqual(didWriteDataCount, 4);
1835   globalInterceptorFactory.enabled = NO;
1836 }
1837
1838 - (void)testConflictingGlobalInterceptors {
1839   id<GRPCInterceptorFactory> factory = [[HookInterceptorFactory alloc]
1840         initWithDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL)
1841                     startHook:nil
1842                 writeDataHook:nil
1843                    finishHook:nil
1844       receiveNextMessagesHook:nil
1845            responseHeaderHook:nil
1846              responseDataHook:nil
1847             responseCloseHook:nil
1848              didWriteDataHook:nil];
1849   @try {
1850     [GRPCCall2 registerGlobalInterceptor:factory];
1851     XCTFail(@"Did not receive an exception when registering global interceptor the second time");
1852   } @catch (NSException *exception) {
1853     // Do nothing; test passes
1854   }
1855 }
1856
1857 - (void)testInterceptorAndGlobalInterceptor {
1858   XCTAssertNotNil([[self class] host]);
1859   __weak XCTestExpectation *expectation =
1860       [self expectationWithDescription:@"testInterceptorAndGlobalInterceptor"];
1861
1862   __block NSUInteger startCount = 0;
1863   __block NSUInteger writeDataCount = 0;
1864   __block NSUInteger finishCount = 0;
1865   __block NSUInteger receiveNextMessageCount = 0;
1866   __block NSUInteger responseHeaderCount = 0;
1867   __block NSUInteger responseDataCount = 0;
1868   __block NSUInteger responseCloseCount = 0;
1869   __block NSUInteger didWriteDataCount = 0;
1870
1871   id<GRPCInterceptorFactory> factory = [[HookInterceptorFactory alloc]
1872       initWithDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL)
1873       startHook:^(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
1874                   GRPCInterceptorManager *manager) {
1875         startCount++;
1876         XCTAssertEqualObjects(requestOptions.host, [[self class] host]);
1877         XCTAssertEqualObjects(requestOptions.path, @"/grpc.testing.TestService/FullDuplexCall");
1878         XCTAssertEqual(requestOptions.safety, GRPCCallSafetyDefault);
1879         [manager startNextInterceptorWithRequest:[requestOptions copy]
1880                                      callOptions:[callOptions copy]];
1881       }
1882       writeDataHook:^(id data, GRPCInterceptorManager *manager) {
1883         writeDataCount++;
1884         [manager writeNextInterceptorWithData:data];
1885       }
1886       finishHook:^(GRPCInterceptorManager *manager) {
1887         finishCount++;
1888         [manager finishNextInterceptor];
1889       }
1890       receiveNextMessagesHook:^(NSUInteger numberOfMessages, GRPCInterceptorManager *manager) {
1891         receiveNextMessageCount++;
1892         [manager receiveNextInterceptorMessages:numberOfMessages];
1893       }
1894       responseHeaderHook:^(NSDictionary *initialMetadata, GRPCInterceptorManager *manager) {
1895         responseHeaderCount++;
1896         [manager forwardPreviousInterceptorWithInitialMetadata:initialMetadata];
1897       }
1898       responseDataHook:^(id data, GRPCInterceptorManager *manager) {
1899         responseDataCount++;
1900         [manager forwardPreviousInterceptorWithData:data];
1901       }
1902       responseCloseHook:^(NSDictionary *trailingMetadata, NSError *error,
1903                           GRPCInterceptorManager *manager) {
1904         responseCloseCount++;
1905         [manager forwardPreviousInterceptorCloseWithTrailingMetadata:trailingMetadata error:error];
1906       }
1907       didWriteDataHook:^(GRPCInterceptorManager *manager) {
1908         didWriteDataCount++;
1909         [manager forwardPreviousInterceptorDidWriteData];
1910       }];
1911
1912   __block NSUInteger globalStartCount = 0;
1913   __block NSUInteger globalWriteDataCount = 0;
1914   __block NSUInteger globalFinishCount = 0;
1915   __block NSUInteger globalReceiveNextMessageCount = 0;
1916   __block NSUInteger globalResponseHeaderCount = 0;
1917   __block NSUInteger globalResponseDataCount = 0;
1918   __block NSUInteger globalResponseCloseCount = 0;
1919   __block NSUInteger globalDidWriteDataCount = 0;
1920
1921   [globalInterceptorFactory
1922       setStartHook:^(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
1923                      GRPCInterceptorManager *manager) {
1924         globalStartCount++;
1925         XCTAssertEqualObjects(requestOptions.host, [[self class] host]);
1926         XCTAssertEqualObjects(requestOptions.path, @"/grpc.testing.TestService/FullDuplexCall");
1927         XCTAssertEqual(requestOptions.safety, GRPCCallSafetyDefault);
1928         [manager startNextInterceptorWithRequest:[requestOptions copy]
1929                                      callOptions:[callOptions copy]];
1930       }
1931       writeDataHook:^(id data, GRPCInterceptorManager *manager) {
1932         globalWriteDataCount++;
1933         [manager writeNextInterceptorWithData:data];
1934       }
1935       finishHook:^(GRPCInterceptorManager *manager) {
1936         globalFinishCount++;
1937         [manager finishNextInterceptor];
1938       }
1939       receiveNextMessagesHook:^(NSUInteger numberOfMessages, GRPCInterceptorManager *manager) {
1940         globalReceiveNextMessageCount++;
1941         [manager receiveNextInterceptorMessages:numberOfMessages];
1942       }
1943       responseHeaderHook:^(NSDictionary *initialMetadata, GRPCInterceptorManager *manager) {
1944         globalResponseHeaderCount++;
1945         [manager forwardPreviousInterceptorWithInitialMetadata:initialMetadata];
1946       }
1947       responseDataHook:^(id data, GRPCInterceptorManager *manager) {
1948         globalResponseDataCount++;
1949         [manager forwardPreviousInterceptorWithData:data];
1950       }
1951       responseCloseHook:^(NSDictionary *trailingMetadata, NSError *error,
1952                           GRPCInterceptorManager *manager) {
1953         globalResponseCloseCount++;
1954         [manager forwardPreviousInterceptorCloseWithTrailingMetadata:trailingMetadata error:error];
1955       }
1956       didWriteDataHook:^(GRPCInterceptorManager *manager) {
1957         globalDidWriteDataCount++;
1958         [manager forwardPreviousInterceptorDidWriteData];
1959       }];
1960
1961   NSArray *requests = @[ @1, @2, @3, @4 ];
1962   NSArray *responses = @[ @1, @2, @3, @4 ];
1963
1964   __block int index = 0;
1965
1966   id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
1967                                                requestedResponseSize:responses[index]];
1968   GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
1969   // For backwards compatibility
1970   options.transportType = [[self class] transportType];
1971   options.transport = [[self class] transport];
1972   options.PEMRootCertificates = [[self class] PEMRootCertificates];
1973   options.hostNameOverride = [[self class] hostNameOverride];
1974   options.flowControlEnabled = YES;
1975   options.interceptorFactories = @[ factory ];
1976   globalInterceptorFactory.enabled = YES;
1977
1978   __block int writeMessageCount = 0;
1979   __block GRPCStreamingProtoCall *call = [_service
1980       fullDuplexCallWithResponseHandler:[[InteropTestsBlockCallbacks alloc]
1981                                             initWithInitialMetadataCallback:nil
1982                                             messageCallback:^(id message) {
1983                                               index += 1;
1984                                               if (index < 4) {
1985                                                 id request = [RMTStreamingOutputCallRequest
1986                                                     messageWithPayloadSize:requests[index]
1987                                                      requestedResponseSize:responses[index]];
1988                                                 XCTAssertLessThanOrEqual(
1989                                                     index, writeMessageCount,
1990                                                     @"Message received before writing message.");
1991                                                 [call writeMessage:request];
1992                                                 [call receiveNextMessage];
1993                                               } else {
1994                                                 [call finish];
1995                                               }
1996                                             }
1997                                             closeCallback:^(NSDictionary *trailingMetadata,
1998                                                             NSError *error) {
1999                                               [expectation fulfill];
2000                                             }
2001                                             writeMessageCallback:^{
2002                                               writeMessageCount++;
2003                                             }]
2004                             callOptions:options];
2005   [call start];
2006   [call receiveNextMessage];
2007   [call writeMessage:request];
2008
2009   [self waitForExpectationsWithTimeout:STREAMING_CALL_TEST_TIMEOUT handler:nil];
2010   XCTAssertEqual(startCount, 1);
2011   XCTAssertEqual(writeDataCount, 4);
2012   XCTAssertEqual(finishCount, 1);
2013   XCTAssertEqual(receiveNextMessageCount, 4);
2014   XCTAssertEqual(responseHeaderCount, 1);
2015   XCTAssertEqual(responseDataCount, 4);
2016   XCTAssertEqual(responseCloseCount, 1);
2017   XCTAssertEqual(didWriteDataCount, 4);
2018   XCTAssertEqual(globalStartCount, 1);
2019   XCTAssertEqual(globalWriteDataCount, 4);
2020   XCTAssertEqual(globalFinishCount, 1);
2021   XCTAssertEqual(globalReceiveNextMessageCount, 4);
2022   XCTAssertEqual(globalResponseHeaderCount, 1);
2023   XCTAssertEqual(globalResponseDataCount, 4);
2024   XCTAssertEqual(globalResponseCloseCount, 1);
2025   XCTAssertEqual(globalDidWriteDataCount, 4);
2026   globalInterceptorFactory.enabled = NO;
2027 }
2028
2029 @end