74a1b47ba6cf33fd4c5d81bb1624cfdd1dcf0ecc
[platform/upstream/grpc.git] / src / objective-c / GRPCClient / GRPCCall.m
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #import "GRPCCall.h"
20
21 #import "GRPCCall+OAuth2.h"
22
23 #import <RxLibrary/GRXBufferedPipe.h>
24 #import <RxLibrary/GRXConcurrentWriteable.h>
25 #import <RxLibrary/GRXImmediateSingleWriter.h>
26 #import <RxLibrary/GRXWriter+Immediate.h>
27 #include <grpc/grpc.h>
28 #include <grpc/support/time.h>
29
30 #import "GRPCCallOptions.h"
31 #import "private/GRPCChannelPool.h"
32 #import "private/GRPCCompletionQueue.h"
33 #import "private/GRPCConnectivityMonitor.h"
34 #import "private/GRPCHost.h"
35 #import "private/GRPCRequestHeaders.h"
36 #import "private/GRPCWrappedCall.h"
37 #import "private/NSData+GRPC.h"
38 #import "private/NSDictionary+GRPC.h"
39 #import "private/NSError+GRPC.h"
40
41 // At most 6 ops can be in an op batch for a client: SEND_INITIAL_METADATA,
42 // SEND_MESSAGE, SEND_CLOSE_FROM_CLIENT, RECV_INITIAL_METADATA, RECV_MESSAGE,
43 // and RECV_STATUS_ON_CLIENT.
44 NSInteger kMaxClientBatch = 6;
45
46 NSString *const kGRPCHeadersKey = @"io.grpc.HeadersKey";
47 NSString *const kGRPCTrailersKey = @"io.grpc.TrailersKey";
48 static NSMutableDictionary *callFlags;
49
50 static NSString *const kAuthorizationHeader = @"authorization";
51 static NSString *const kBearerPrefix = @"Bearer ";
52
53 const char *kCFStreamVarName = "grpc_cfstream";
54
55 @interface GRPCCall ()<GRXWriteable>
56 // Make them read-write.
57 @property(atomic, strong) NSDictionary *responseHeaders;
58 @property(atomic, strong) NSDictionary *responseTrailers;
59
60 - (instancetype)initWithHost:(NSString *)host
61                         path:(NSString *)path
62                   callSafety:(GRPCCallSafety)safety
63               requestsWriter:(GRXWriter *)requestsWriter
64                  callOptions:(GRPCCallOptions *)callOptions;
65
66 @end
67
68 @implementation GRPCRequestOptions
69
70 - (instancetype)initWithHost:(NSString *)host path:(NSString *)path safety:(GRPCCallSafety)safety {
71   NSAssert(host.length != 0 && path.length != 0, @"host and path cannot be empty");
72   if (host.length == 0 || path.length == 0) {
73     return nil;
74   }
75   if ((self = [super init])) {
76     _host = [host copy];
77     _path = [path copy];
78     _safety = safety;
79   }
80   return self;
81 }
82
83 - (id)copyWithZone:(NSZone *)zone {
84   GRPCRequestOptions *request =
85       [[GRPCRequestOptions alloc] initWithHost:_host path:_path safety:_safety];
86
87   return request;
88 }
89
90 @end
91
92 @implementation GRPCCall2 {
93   /** Options for the call. */
94   GRPCCallOptions *_callOptions;
95   /** The handler of responses. */
96   id<GRPCResponseHandler> _handler;
97
98   // Thread safety of ivars below are protected by _dispatchQueue.
99
100   /**
101    * Make use of legacy GRPCCall to make calls. Nullified when call is finished.
102    */
103   GRPCCall *_call;
104   /** Flags whether initial metadata has been published to response handler. */
105   BOOL _initialMetadataPublished;
106   /** Streaming call writeable to the underlying call. */
107   GRXBufferedPipe *_pipe;
108   /** Serial dispatch queue for tasks inside the call. */
109   dispatch_queue_t _dispatchQueue;
110   /** Flags whether call has started. */
111   BOOL _started;
112   /** Flags whether call has been canceled. */
113   BOOL _canceled;
114   /** Flags whether call has been finished. */
115   BOOL _finished;
116 }
117
118 - (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions
119                        responseHandler:(id<GRPCResponseHandler>)responseHandler
120                            callOptions:(GRPCCallOptions *)callOptions {
121   NSAssert(requestOptions.host.length != 0 && requestOptions.path.length != 0,
122            @"Neither host nor path can be nil.");
123   NSAssert(requestOptions.safety <= GRPCCallSafetyCacheableRequest, @"Invalid call safety value.");
124   NSAssert(responseHandler != nil, @"Response handler required.");
125   if (requestOptions.host.length == 0 || requestOptions.path.length == 0) {
126     return nil;
127   }
128   if (requestOptions.safety > GRPCCallSafetyCacheableRequest) {
129     return nil;
130   }
131   if (responseHandler == nil) {
132     return nil;
133   }
134
135   if ((self = [super init])) {
136     _requestOptions = [requestOptions copy];
137     if (callOptions == nil) {
138       _callOptions = [[GRPCCallOptions alloc] init];
139     } else {
140       _callOptions = [callOptions copy];
141     }
142     _handler = responseHandler;
143     _initialMetadataPublished = NO;
144     _pipe = [GRXBufferedPipe pipe];
145     // Set queue QoS only when iOS version is 8.0 or above and Xcode version is 9.0 or above
146 #if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300
147     if (@available(iOS 8.0, macOS 10.10, *)) {
148       _dispatchQueue = dispatch_queue_create(
149           NULL,
150           dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0));
151     } else {
152 #else
153     {
154 #endif
155       _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
156     }
157     dispatch_set_target_queue(_dispatchQueue, responseHandler.dispatchQueue);
158     _started = NO;
159     _canceled = NO;
160     _finished = NO;
161   }
162
163   return self;
164 }
165
166 - (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions
167                        responseHandler:(id<GRPCResponseHandler>)responseHandler {
168   return
169       [self initWithRequestOptions:requestOptions responseHandler:responseHandler callOptions:nil];
170 }
171
172 - (void)start {
173   GRPCCall *copiedCall = nil;
174   @synchronized(self) {
175     NSAssert(!_started, @"Call already started.");
176     NSAssert(!_canceled, @"Call already canceled.");
177     if (_started) {
178       return;
179     }
180     if (_canceled) {
181       return;
182     }
183
184     _started = YES;
185     if (!_callOptions) {
186       _callOptions = [[GRPCCallOptions alloc] init];
187     }
188
189     _call = [[GRPCCall alloc] initWithHost:_requestOptions.host
190                                       path:_requestOptions.path
191                                 callSafety:_requestOptions.safety
192                             requestsWriter:_pipe
193                                callOptions:_callOptions];
194     if (_callOptions.initialMetadata) {
195       [_call.requestHeaders addEntriesFromDictionary:_callOptions.initialMetadata];
196     }
197     copiedCall = _call;
198   }
199
200   void (^valueHandler)(id value) = ^(id value) {
201     @synchronized(self) {
202       if (self->_handler) {
203         if (!self->_initialMetadataPublished) {
204           self->_initialMetadataPublished = YES;
205           [self issueInitialMetadata:self->_call.responseHeaders];
206         }
207         if (value) {
208           [self issueMessage:value];
209         }
210       }
211     }
212   };
213   void (^completionHandler)(NSError *errorOrNil) = ^(NSError *errorOrNil) {
214     @synchronized(self) {
215       if (self->_handler) {
216         if (!self->_initialMetadataPublished) {
217           self->_initialMetadataPublished = YES;
218           [self issueInitialMetadata:self->_call.responseHeaders];
219         }
220         [self issueClosedWithTrailingMetadata:self->_call.responseTrailers error:errorOrNil];
221       }
222       // Clearing _call must happen *after* dispatching close in order to get trailing
223       // metadata from _call.
224       if (self->_call) {
225         // Clean up the request writers. This should have no effect to _call since its
226         // response writeable is already nullified.
227         [self->_pipe writesFinishedWithError:nil];
228         self->_call = nil;
229         self->_pipe = nil;
230       }
231     }
232   };
233   id<GRXWriteable> responseWriteable =
234       [[GRXWriteable alloc] initWithValueHandler:valueHandler completionHandler:completionHandler];
235   [copiedCall startWithWriteable:responseWriteable];
236 }
237
238 - (void)cancel {
239   GRPCCall *copiedCall = nil;
240   @synchronized(self) {
241     if (_canceled) {
242       return;
243     }
244
245     _canceled = YES;
246
247     copiedCall = _call;
248     _call = nil;
249     _pipe = nil;
250
251     if ([_handler respondsToSelector:@selector(didCloseWithTrailingMetadata:error:)]) {
252       dispatch_async(_dispatchQueue, ^{
253         // Copy to local so that block is freed after cancellation completes.
254         id<GRPCResponseHandler> copiedHandler = nil;
255         @synchronized(self) {
256           copiedHandler = self->_handler;
257           self->_handler = nil;
258         }
259
260         [copiedHandler didCloseWithTrailingMetadata:nil
261                                               error:[NSError errorWithDomain:kGRPCErrorDomain
262                                                                         code:GRPCErrorCodeCancelled
263                                                                     userInfo:@{
264                                                                       NSLocalizedDescriptionKey :
265                                                                           @"Canceled by app"
266                                                                     }]];
267       });
268     } else {
269       _handler = nil;
270     }
271   }
272   [copiedCall cancel];
273 }
274
275 - (void)writeData:(NSData *)data {
276   GRXBufferedPipe *copiedPipe = nil;
277   @synchronized(self) {
278     NSAssert(!_canceled, @"Call already canceled.");
279     NSAssert(!_finished, @"Call is half-closed before sending data.");
280     if (_canceled) {
281       return;
282     }
283     if (_finished) {
284       return;
285     }
286
287     if (_pipe) {
288       copiedPipe = _pipe;
289     }
290   }
291   [copiedPipe writeValue:data];
292 }
293
294 - (void)finish {
295   GRXBufferedPipe *copiedPipe = nil;
296   @synchronized(self) {
297     NSAssert(_started, @"Call not started.");
298     NSAssert(!_canceled, @"Call already canceled.");
299     NSAssert(!_finished, @"Call already half-closed.");
300     if (!_started) {
301       return;
302     }
303     if (_canceled) {
304       return;
305     }
306     if (_finished) {
307       return;
308     }
309
310     if (_pipe) {
311       copiedPipe = _pipe;
312       _pipe = nil;
313     }
314     _finished = YES;
315   }
316   [copiedPipe writesFinishedWithError:nil];
317 }
318
319 - (void)issueInitialMetadata:(NSDictionary *)initialMetadata {
320   @synchronized(self) {
321     if (initialMetadata != nil &&
322         [_handler respondsToSelector:@selector(didReceiveInitialMetadata:)]) {
323       dispatch_async(_dispatchQueue, ^{
324         id<GRPCResponseHandler> copiedHandler = nil;
325         @synchronized(self) {
326           copiedHandler = self->_handler;
327         }
328         [copiedHandler didReceiveInitialMetadata:initialMetadata];
329       });
330     }
331   }
332 }
333
334 - (void)issueMessage:(id)message {
335   @synchronized(self) {
336     if (message != nil && [_handler respondsToSelector:@selector(didReceiveRawMessage:)]) {
337       dispatch_async(_dispatchQueue, ^{
338         id<GRPCResponseHandler> copiedHandler = nil;
339         @synchronized(self) {
340           copiedHandler = self->_handler;
341         }
342         [copiedHandler didReceiveRawMessage:message];
343       });
344     }
345   }
346 }
347
348 - (void)issueClosedWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error {
349   @synchronized(self) {
350     if ([_handler respondsToSelector:@selector(didCloseWithTrailingMetadata:error:)]) {
351       dispatch_async(_dispatchQueue, ^{
352         id<GRPCResponseHandler> copiedHandler = nil;
353         @synchronized(self) {
354           copiedHandler = self->_handler;
355           // Clean up _handler so that no more responses are reported to the handler.
356           self->_handler = nil;
357         }
358         [copiedHandler didCloseWithTrailingMetadata:trailingMetadata error:error];
359       });
360     } else {
361       _handler = nil;
362     }
363   }
364 }
365
366 @end
367
368 // The following methods of a C gRPC call object aren't reentrant, and thus
369 // calls to them must be serialized:
370 // - start_batch
371 // - destroy
372 //
373 // start_batch with a SEND_MESSAGE argument can only be called after the
374 // OP_COMPLETE event for any previous write is received. This is achieved by
375 // pausing the requests writer immediately every time it writes a value, and
376 // resuming it again when OP_COMPLETE is received.
377 //
378 // Similarly, start_batch with a RECV_MESSAGE argument can only be called after
379 // the OP_COMPLETE event for any previous read is received.This is easier to
380 // enforce, as we're writing the received messages into the writeable:
381 // start_batch is enqueued once upon receiving the OP_COMPLETE event for the
382 // RECV_METADATA batch, and then once after receiving each OP_COMPLETE event for
383 // each RECV_MESSAGE batch.
384 @implementation GRPCCall {
385   dispatch_queue_t _callQueue;
386
387   NSString *_host;
388   NSString *_path;
389   GRPCCallSafety _callSafety;
390   GRPCCallOptions *_callOptions;
391   GRPCWrappedCall *_wrappedCall;
392   GRPCConnectivityMonitor *_connectivityMonitor;
393
394   // The C gRPC library has less guarantees on the ordering of events than we
395   // do. Particularly, in the face of errors, there's no ordering guarantee at
396   // all. This wrapper over our actual writeable ensures thread-safety and
397   // correct ordering.
398   GRXConcurrentWriteable *_responseWriteable;
399
400   // The network thread wants the requestWriter to resume (when the server is ready for more input),
401   // or to stop (on errors), concurrently with user threads that want to start it, pause it or stop
402   // it. Because a writer isn't thread-safe, we'll synchronize those operations on it.
403   // We don't use a dispatch queue for that purpose, because the writer can call writeValue: or
404   // writesFinishedWithError: on this GRPCCall as part of those operations. We want to be able to
405   // pause the writer immediately on writeValue:, so we need our locking to be recursive.
406   GRXWriter *_requestWriter;
407
408   // To create a retain cycle when a call is started, up until it finishes. See
409   // |startWithWriteable:| and |finishWithError:|. This saves users from having to retain a
410   // reference to the call object if all they're interested in is the handler being executed when
411   // the response arrives.
412   GRPCCall *_retainSelf;
413
414   GRPCRequestHeaders *_requestHeaders;
415
416   // In the case that the call is a unary call (i.e. the writer to GRPCCall is of type
417   // GRXImmediateSingleWriter), GRPCCall will delay sending ops (not send them to C core
418   // immediately) and buffer them into a batch _unaryOpBatch. The batch is sent to C core when
419   // the SendClose op is added.
420   BOOL _unaryCall;
421   NSMutableArray *_unaryOpBatch;
422
423   // The dispatch queue to be used for enqueuing responses to user. Defaulted to the main dispatch
424   // queue
425   dispatch_queue_t _responseQueue;
426
427   // The OAuth2 token fetched from a token provider.
428   NSString *_fetchedOauth2AccessToken;
429 }
430
431 @synthesize state = _state;
432
433 + (void)initialize {
434   // Guarantees the code in {} block is invoked only once. See ref at:
435   // https://developer.apple.com/documentation/objectivec/nsobject/1418639-initialize?language=objc
436   if (self == [GRPCCall self]) {
437     grpc_init();
438     callFlags = [NSMutableDictionary dictionary];
439   }
440 }
441
442 + (void)setCallSafety:(GRPCCallSafety)callSafety host:(NSString *)host path:(NSString *)path {
443   if (host.length == 0 || path.length == 0) {
444     return;
445   }
446   NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
447   @synchronized(callFlags) {
448     switch (callSafety) {
449       case GRPCCallSafetyDefault:
450         callFlags[hostAndPath] = @0;
451         break;
452       case GRPCCallSafetyIdempotentRequest:
453         callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
454         break;
455       case GRPCCallSafetyCacheableRequest:
456         callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
457         break;
458       default:
459         break;
460     }
461   }
462 }
463
464 + (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path {
465   NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
466   @synchronized(callFlags) {
467     return [callFlags[hostAndPath] intValue];
468   }
469 }
470
471 // Designated initializer
472 - (instancetype)initWithHost:(NSString *)host
473                         path:(NSString *)path
474               requestsWriter:(GRXWriter *)requestWriter {
475   return [self initWithHost:host
476                        path:path
477                  callSafety:GRPCCallSafetyDefault
478              requestsWriter:requestWriter
479                 callOptions:nil];
480 }
481
482 - (instancetype)initWithHost:(NSString *)host
483                         path:(NSString *)path
484                   callSafety:(GRPCCallSafety)safety
485               requestsWriter:(GRXWriter *)requestWriter
486                  callOptions:(GRPCCallOptions *)callOptions {
487   // Purposely using pointer rather than length (host.length == 0) for backwards compatibility.
488   NSAssert(host != nil && path != nil, @"Neither host nor path can be nil.");
489   NSAssert(safety <= GRPCCallSafetyCacheableRequest, @"Invalid call safety value.");
490   NSAssert(requestWriter.state == GRXWriterStateNotStarted,
491            @"The requests writer can't be already started.");
492   if (!host || !path) {
493     return nil;
494   }
495   if (safety > GRPCCallSafetyCacheableRequest) {
496     return nil;
497   }
498   if (requestWriter.state != GRXWriterStateNotStarted) {
499     return nil;
500   }
501
502   if ((self = [super init])) {
503     _host = [host copy];
504     _path = [path copy];
505     _callSafety = safety;
506     _callOptions = [callOptions copy];
507
508     // Serial queue to invoke the non-reentrant methods of the grpc_call object.
509     _callQueue = dispatch_queue_create("io.grpc.call", DISPATCH_QUEUE_SERIAL);
510
511     _requestWriter = requestWriter;
512
513     _requestHeaders = [[GRPCRequestHeaders alloc] initWithCall:self];
514
515     if ([requestWriter isKindOfClass:[GRXImmediateSingleWriter class]]) {
516       _unaryCall = YES;
517       _unaryOpBatch = [NSMutableArray arrayWithCapacity:kMaxClientBatch];
518     }
519
520     _responseQueue = dispatch_get_main_queue();
521   }
522   return self;
523 }
524
525 - (void)setResponseDispatchQueue:(dispatch_queue_t)queue {
526   @synchronized(self) {
527     if (_state != GRXWriterStateNotStarted) {
528       return;
529     }
530     _responseQueue = queue;
531   }
532 }
533
534 #pragma mark Finish
535
536 // This function should support being called within a @synchronized(self) block in another function
537 // Should not manipulate _requestWriter for deadlock prevention.
538 - (void)finishWithError:(NSError *)errorOrNil {
539   @synchronized(self) {
540     if (_state == GRXWriterStateFinished) {
541       return;
542     }
543     _state = GRXWriterStateFinished;
544
545     if (errorOrNil) {
546       [_responseWriteable cancelWithError:errorOrNil];
547     } else {
548       [_responseWriteable enqueueSuccessfulCompletion];
549     }
550
551     // If the call isn't retained anywhere else, it can be deallocated now.
552     _retainSelf = nil;
553   }
554 }
555
556 - (void)cancel {
557   @synchronized(self) {
558     if (_state == GRXWriterStateFinished) {
559       return;
560     }
561     [self finishWithError:[NSError
562                               errorWithDomain:kGRPCErrorDomain
563                                          code:GRPCErrorCodeCancelled
564                                      userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]];
565     [_wrappedCall cancel];
566   }
567   _requestWriter.state = GRXWriterStateFinished;
568 }
569
570 - (void)dealloc {
571   __block GRPCWrappedCall *wrappedCall = _wrappedCall;
572   dispatch_async(_callQueue, ^{
573     wrappedCall = nil;
574   });
575 }
576
577 #pragma mark Read messages
578
579 // Only called from the call queue.
580 // The handler will be called from the network queue.
581 - (void)startReadWithHandler:(void (^)(grpc_byte_buffer *))handler {
582   // TODO(jcanizales): Add error handlers for async failures
583   [_wrappedCall startBatchWithOperations:@[ [[GRPCOpRecvMessage alloc] initWithHandler:handler] ]];
584 }
585
586 // Called initially from the network queue once response headers are received,
587 // then "recursively" from the responseWriteable queue after each response from the
588 // server has been written.
589 // If the call is currently paused, this is a noop. Restarting the call will invoke this
590 // method.
591 // TODO(jcanizales): Rename to readResponseIfNotPaused.
592 - (void)startNextRead {
593   @synchronized(self) {
594     if (_state != GRXWriterStateStarted) {
595       return;
596     }
597   }
598
599   dispatch_async(_callQueue, ^{
600     __weak GRPCCall *weakSelf = self;
601     [self startReadWithHandler:^(grpc_byte_buffer *message) {
602       if (message == NULL) {
603         // No more messages from the server
604         return;
605       }
606       __strong GRPCCall *strongSelf = weakSelf;
607       if (strongSelf == nil) {
608         grpc_byte_buffer_destroy(message);
609         return;
610       }
611       NSData *data = [NSData grpc_dataWithByteBuffer:message];
612       grpc_byte_buffer_destroy(message);
613       if (!data) {
614         // The app doesn't have enough memory to hold the server response. We
615         // don't want to throw, because the app shouldn't crash for a behavior
616         // that's on the hands of any server to have. Instead we finish and ask
617         // the server to cancel.
618         @synchronized(strongSelf) {
619           [strongSelf
620               finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
621                                                   code:GRPCErrorCodeResourceExhausted
622                                               userInfo:@{
623                                                 NSLocalizedDescriptionKey :
624                                                     @"Client does not have enough memory to "
625                                                     @"hold the server response."
626                                               }]];
627           [strongSelf->_wrappedCall cancel];
628         }
629         strongSelf->_requestWriter.state = GRXWriterStateFinished;
630       } else {
631         @synchronized(strongSelf) {
632           [strongSelf->_responseWriteable enqueueValue:data
633                                      completionHandler:^{
634                                        [strongSelf startNextRead];
635                                      }];
636         }
637       }
638     }];
639   });
640 }
641
642 #pragma mark Send headers
643
644 - (void)sendHeaders {
645   // TODO (mxyan): Remove after deprecated methods are removed
646   uint32_t callSafetyFlags = 0;
647   switch (_callSafety) {
648     case GRPCCallSafetyDefault:
649       callSafetyFlags = 0;
650       break;
651     case GRPCCallSafetyIdempotentRequest:
652       callSafetyFlags = GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
653       break;
654     case GRPCCallSafetyCacheableRequest:
655       callSafetyFlags = GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
656       break;
657   }
658
659   NSMutableDictionary *headers = [_requestHeaders mutableCopy];
660   NSString *fetchedOauth2AccessToken;
661   @synchronized(self) {
662     fetchedOauth2AccessToken = _fetchedOauth2AccessToken;
663   }
664   if (fetchedOauth2AccessToken != nil) {
665     headers[@"authorization"] = [kBearerPrefix stringByAppendingString:fetchedOauth2AccessToken];
666   } else if (_callOptions.oauth2AccessToken != nil) {
667     headers[@"authorization"] =
668         [kBearerPrefix stringByAppendingString:_callOptions.oauth2AccessToken];
669   }
670
671   // TODO(jcanizales): Add error handlers for async failures
672   GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc]
673       initWithMetadata:headers
674                  flags:callSafetyFlags
675                handler:nil];  // No clean-up needed after SEND_INITIAL_METADATA
676   dispatch_async(_callQueue, ^{
677     if (!self->_unaryCall) {
678       [self->_wrappedCall startBatchWithOperations:@[ op ]];
679     } else {
680       [self->_unaryOpBatch addObject:op];
681     }
682   });
683 }
684
685 #pragma mark GRXWriteable implementation
686
687 // Only called from the call queue. The error handler will be called from the
688 // network queue if the write didn't succeed.
689 // If the call is a unary call, parameter \a errorHandler will be ignored and
690 // the error handler of GRPCOpSendClose will be executed in case of error.
691 - (void)writeMessage:(NSData *)message withErrorHandler:(void (^)(void))errorHandler {
692   __weak GRPCCall *weakSelf = self;
693   void (^resumingHandler)(void) = ^{
694     // Resume the request writer.
695     GRPCCall *strongSelf = weakSelf;
696     if (strongSelf) {
697       strongSelf->_requestWriter.state = GRXWriterStateStarted;
698     }
699   };
700
701   GRPCOpSendMessage *op =
702       [[GRPCOpSendMessage alloc] initWithMessage:message handler:resumingHandler];
703   if (!_unaryCall) {
704     [_wrappedCall startBatchWithOperations:@[ op ] errorHandler:errorHandler];
705   } else {
706     // Ignored errorHandler since it is the same as the one for GRPCOpSendClose.
707     // TODO (mxyan): unify the error handlers of all Ops into a single closure.
708     [_unaryOpBatch addObject:op];
709   }
710 }
711
712 - (void)writeValue:(id)value {
713   NSAssert([value isKindOfClass:[NSData class]], @"value must be of type NSData");
714
715   @synchronized(self) {
716     if (_state == GRXWriterStateFinished) {
717       return;
718     }
719   }
720
721   // Pause the input and only resume it when the C layer notifies us that writes
722   // can proceed.
723   _requestWriter.state = GRXWriterStatePaused;
724
725   dispatch_async(_callQueue, ^{
726     // Write error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT
727     [self writeMessage:value withErrorHandler:nil];
728   });
729 }
730
731 // Only called from the call queue. The error handler will be called from the
732 // network queue if the requests stream couldn't be closed successfully.
733 - (void)finishRequestWithErrorHandler:(void (^)(void))errorHandler {
734   if (!_unaryCall) {
735     [_wrappedCall startBatchWithOperations:@[ [[GRPCOpSendClose alloc] init] ]
736                               errorHandler:errorHandler];
737   } else {
738     [_unaryOpBatch addObject:[[GRPCOpSendClose alloc] init]];
739     [_wrappedCall startBatchWithOperations:_unaryOpBatch errorHandler:errorHandler];
740   }
741 }
742
743 - (void)writesFinishedWithError:(NSError *)errorOrNil {
744   if (errorOrNil) {
745     [self cancel];
746   } else {
747     dispatch_async(_callQueue, ^{
748       // EOS error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT
749       [self finishRequestWithErrorHandler:nil];
750     });
751   }
752 }
753
754 #pragma mark Invoke
755
756 // Both handlers will eventually be called, from the network queue. Writes can start immediately
757 // after this.
758 // The first one (headersHandler), when the response headers are received.
759 // The second one (completionHandler), whenever the RPC finishes for any reason.
760 - (void)invokeCallWithHeadersHandler:(void (^)(NSDictionary *))headersHandler
761                    completionHandler:(void (^)(NSError *, NSDictionary *))completionHandler {
762   dispatch_async(_callQueue, ^{
763     // TODO(jcanizales): Add error handlers for async failures
764     [self->_wrappedCall
765         startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]];
766     [self->_wrappedCall
767         startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]];
768   });
769 }
770
771 - (void)invokeCall {
772   __weak GRPCCall *weakSelf = self;
773   [self invokeCallWithHeadersHandler:^(NSDictionary *headers) {
774     // Response headers received.
775     __strong GRPCCall *strongSelf = weakSelf;
776     if (strongSelf) {
777       strongSelf.responseHeaders = headers;
778       [strongSelf startNextRead];
779     }
780   }
781       completionHandler:^(NSError *error, NSDictionary *trailers) {
782         __strong GRPCCall *strongSelf = weakSelf;
783         if (strongSelf) {
784           strongSelf.responseTrailers = trailers;
785
786           if (error) {
787             NSMutableDictionary *userInfo = [NSMutableDictionary dictionary];
788             if (error.userInfo) {
789               [userInfo addEntriesFromDictionary:error.userInfo];
790             }
791             userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers;
792             // Since gRPC core does not guarantee the headers block being called before this block,
793             // responseHeaders might be nil.
794             userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders;
795             error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo];
796           }
797           [strongSelf finishWithError:error];
798           strongSelf->_requestWriter.state = GRXWriterStateFinished;
799         }
800       }];
801 }
802
803 #pragma mark GRXWriter implementation
804
805 // Lock acquired inside startWithWriteable:
806 - (void)startCallWithWriteable:(id<GRXWriteable>)writeable {
807   @synchronized(self) {
808     if (_state == GRXWriterStateFinished) {
809       return;
810     }
811
812     _responseWriteable =
813         [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue];
814
815     GRPCPooledChannel *channel =
816         [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions];
817     _wrappedCall = [channel wrappedCallWithPath:_path
818                                 completionQueue:[GRPCCompletionQueue completionQueue]
819                                     callOptions:_callOptions];
820
821     if (_wrappedCall == nil) {
822       [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
823                                                 code:GRPCErrorCodeUnavailable
824                                             userInfo:@{
825                                               NSLocalizedDescriptionKey :
826                                                   @"Failed to create call or channel."
827                                             }]];
828       return;
829     }
830
831     [self sendHeaders];
832     [self invokeCall];
833
834     // Connectivity monitor is not required for CFStream
835     char *enableCFStream = getenv(kCFStreamVarName);
836     if (enableCFStream == nil || enableCFStream[0] != '1') {
837       [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)];
838     }
839   }
840
841   // Now that the RPC has been initiated, request writes can start.
842   [_requestWriter startWithWriteable:self];
843 }
844
845 - (void)startWithWriteable:(id<GRXWriteable>)writeable {
846   id<GRPCAuthorizationProtocol> tokenProvider = nil;
847   @synchronized(self) {
848     _state = GRXWriterStateStarted;
849
850     // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled).
851     // This makes RPCs in which the call isn't externally retained possible (as long as it is
852     // started before being autoreleased). Care is taken not to retain self strongly in any of the
853     // blocks used in this implementation, so that the life of the instance is determined by this
854     // retain cycle.
855     _retainSelf = self;
856
857     if (_callOptions == nil) {
858       GRPCMutableCallOptions *callOptions = [[GRPCHost callOptionsForHost:_host] mutableCopy];
859       if (_serverName.length != 0) {
860         callOptions.serverAuthority = _serverName;
861       }
862       if (_timeout > 0) {
863         callOptions.timeout = _timeout;
864       }
865       uint32_t callFlags = [GRPCCall callFlagsForHost:_host path:_path];
866       if (callFlags != 0) {
867         if (callFlags == GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
868           _callSafety = GRPCCallSafetyIdempotentRequest;
869         } else if (callFlags == GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) {
870           _callSafety = GRPCCallSafetyCacheableRequest;
871         }
872       }
873
874       id<GRPCAuthorizationProtocol> tokenProvider = self.tokenProvider;
875       if (tokenProvider != nil) {
876         callOptions.authTokenProvider = tokenProvider;
877       }
878       _callOptions = callOptions;
879     }
880
881     NSAssert(_callOptions.authTokenProvider == nil || _callOptions.oauth2AccessToken == nil,
882              @"authTokenProvider and oauth2AccessToken cannot be set at the same time");
883
884     tokenProvider = _callOptions.authTokenProvider;
885   }
886
887   if (tokenProvider != nil) {
888     __weak typeof(self) weakSelf = self;
889     [tokenProvider getTokenWithHandler:^(NSString *token) {
890       __strong typeof(self) strongSelf = weakSelf;
891       if (strongSelf) {
892         BOOL startCall = NO;
893         @synchronized(strongSelf) {
894           if (strongSelf->_state != GRXWriterStateFinished) {
895             startCall = YES;
896             if (token) {
897               strongSelf->_fetchedOauth2AccessToken = [token copy];
898             }
899           }
900         }
901         if (startCall) {
902           [strongSelf startCallWithWriteable:writeable];
903         }
904       }
905     }];
906   } else {
907     [self startCallWithWriteable:writeable];
908   }
909 }
910
911 - (void)setState:(GRXWriterState)newState {
912   @synchronized(self) {
913     // Manual transitions are only allowed from the started or paused states.
914     if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
915       return;
916     }
917
918     switch (newState) {
919       case GRXWriterStateFinished:
920         _state = newState;
921         // Per GRXWriter's contract, setting the state to Finished manually
922         // means one doesn't wish the writeable to be messaged anymore.
923         [_responseWriteable cancelSilently];
924         _responseWriteable = nil;
925         return;
926       case GRXWriterStatePaused:
927         _state = newState;
928         return;
929       case GRXWriterStateStarted:
930         if (_state == GRXWriterStatePaused) {
931           _state = newState;
932           [self startNextRead];
933         }
934         return;
935       case GRXWriterStateNotStarted:
936         return;
937     }
938   }
939 }
940
941 - (void)connectivityChanged:(NSNotification *)note {
942   // Cancel underlying call upon this notification.
943
944   // Retain because connectivity manager only keeps weak reference to GRPCCall.
945   __strong GRPCCall *strongSelf = self;
946   if (strongSelf) {
947     @synchronized(strongSelf) {
948       [_wrappedCall cancel];
949       [strongSelf
950           finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
951                                               code:GRPCErrorCodeUnavailable
952                                           userInfo:@{
953                                             NSLocalizedDescriptionKey : @"Connectivity lost."
954                                           }]];
955     }
956     strongSelf->_requestWriter.state = GRXWriterStateFinished;
957   }
958 }
959
960 @end