3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
21 #import "GRPCCall+OAuth2.h"
23 #import <RxLibrary/GRXBufferedPipe.h>
24 #import <RxLibrary/GRXConcurrentWriteable.h>
25 #import <RxLibrary/GRXImmediateSingleWriter.h>
26 #import <RxLibrary/GRXWriter+Immediate.h>
27 #include <grpc/grpc.h>
28 #include <grpc/support/time.h>
30 #import "GRPCCallOptions.h"
31 #import "private/GRPCChannelPool.h"
32 #import "private/GRPCCompletionQueue.h"
33 #import "private/GRPCConnectivityMonitor.h"
34 #import "private/GRPCHost.h"
35 #import "private/GRPCRequestHeaders.h"
36 #import "private/GRPCWrappedCall.h"
37 #import "private/NSData+GRPC.h"
38 #import "private/NSDictionary+GRPC.h"
39 #import "private/NSError+GRPC.h"
41 // At most 6 ops can be in an op batch for a client: SEND_INITIAL_METADATA,
42 // SEND_MESSAGE, SEND_CLOSE_FROM_CLIENT, RECV_INITIAL_METADATA, RECV_MESSAGE,
43 // and RECV_STATUS_ON_CLIENT.
44 NSInteger kMaxClientBatch = 6;
46 NSString *const kGRPCHeadersKey = @"io.grpc.HeadersKey";
47 NSString *const kGRPCTrailersKey = @"io.grpc.TrailersKey";
48 static NSMutableDictionary *callFlags;
50 static NSString *const kAuthorizationHeader = @"authorization";
51 static NSString *const kBearerPrefix = @"Bearer ";
53 const char *kCFStreamVarName = "grpc_cfstream";
55 @interface GRPCCall ()<GRXWriteable>
56 // Make them read-write.
57 @property(atomic, strong) NSDictionary *responseHeaders;
58 @property(atomic, strong) NSDictionary *responseTrailers;
60 - (instancetype)initWithHost:(NSString *)host
62 callSafety:(GRPCCallSafety)safety
63 requestsWriter:(GRXWriter *)requestsWriter
64 callOptions:(GRPCCallOptions *)callOptions;
68 @implementation GRPCRequestOptions
70 - (instancetype)initWithHost:(NSString *)host path:(NSString *)path safety:(GRPCCallSafety)safety {
71 NSAssert(host.length != 0 && path.length != 0, @"host and path cannot be empty");
72 if (host.length == 0 || path.length == 0) {
75 if ((self = [super init])) {
83 - (id)copyWithZone:(NSZone *)zone {
84 GRPCRequestOptions *request =
85 [[GRPCRequestOptions alloc] initWithHost:_host path:_path safety:_safety];
92 @implementation GRPCCall2 {
93 /** Options for the call. */
94 GRPCCallOptions *_callOptions;
95 /** The handler of responses. */
96 id<GRPCResponseHandler> _handler;
98 // Thread safety of ivars below are protected by _dispatchQueue.
101 * Make use of legacy GRPCCall to make calls. Nullified when call is finished.
104 /** Flags whether initial metadata has been published to response handler. */
105 BOOL _initialMetadataPublished;
106 /** Streaming call writeable to the underlying call. */
107 GRXBufferedPipe *_pipe;
108 /** Serial dispatch queue for tasks inside the call. */
109 dispatch_queue_t _dispatchQueue;
110 /** Flags whether call has started. */
112 /** Flags whether call has been canceled. */
114 /** Flags whether call has been finished. */
118 - (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions
119 responseHandler:(id<GRPCResponseHandler>)responseHandler
120 callOptions:(GRPCCallOptions *)callOptions {
121 NSAssert(requestOptions.host.length != 0 && requestOptions.path.length != 0,
122 @"Neither host nor path can be nil.");
123 NSAssert(requestOptions.safety <= GRPCCallSafetyCacheableRequest, @"Invalid call safety value.");
124 NSAssert(responseHandler != nil, @"Response handler required.");
125 if (requestOptions.host.length == 0 || requestOptions.path.length == 0) {
128 if (requestOptions.safety > GRPCCallSafetyCacheableRequest) {
131 if (responseHandler == nil) {
135 if ((self = [super init])) {
136 _requestOptions = [requestOptions copy];
137 if (callOptions == nil) {
138 _callOptions = [[GRPCCallOptions alloc] init];
140 _callOptions = [callOptions copy];
142 _handler = responseHandler;
143 _initialMetadataPublished = NO;
144 _pipe = [GRXBufferedPipe pipe];
145 // Set queue QoS only when iOS version is 8.0 or above and Xcode version is 9.0 or above
146 #if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300
147 if (@available(iOS 8.0, macOS 10.10, *)) {
148 _dispatchQueue = dispatch_queue_create(
150 dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0));
155 _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
157 dispatch_set_target_queue(_dispatchQueue, responseHandler.dispatchQueue);
166 - (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions
167 responseHandler:(id<GRPCResponseHandler>)responseHandler {
169 [self initWithRequestOptions:requestOptions responseHandler:responseHandler callOptions:nil];
173 GRPCCall *copiedCall = nil;
174 @synchronized(self) {
175 NSAssert(!_started, @"Call already started.");
176 NSAssert(!_canceled, @"Call already canceled.");
186 _callOptions = [[GRPCCallOptions alloc] init];
189 _call = [[GRPCCall alloc] initWithHost:_requestOptions.host
190 path:_requestOptions.path
191 callSafety:_requestOptions.safety
193 callOptions:_callOptions];
194 if (_callOptions.initialMetadata) {
195 [_call.requestHeaders addEntriesFromDictionary:_callOptions.initialMetadata];
200 void (^valueHandler)(id value) = ^(id value) {
201 @synchronized(self) {
202 if (self->_handler) {
203 if (!self->_initialMetadataPublished) {
204 self->_initialMetadataPublished = YES;
205 [self issueInitialMetadata:self->_call.responseHeaders];
208 [self issueMessage:value];
213 void (^completionHandler)(NSError *errorOrNil) = ^(NSError *errorOrNil) {
214 @synchronized(self) {
215 if (self->_handler) {
216 if (!self->_initialMetadataPublished) {
217 self->_initialMetadataPublished = YES;
218 [self issueInitialMetadata:self->_call.responseHeaders];
220 [self issueClosedWithTrailingMetadata:self->_call.responseTrailers error:errorOrNil];
222 // Clearing _call must happen *after* dispatching close in order to get trailing
223 // metadata from _call.
225 // Clean up the request writers. This should have no effect to _call since its
226 // response writeable is already nullified.
227 [self->_pipe writesFinishedWithError:nil];
233 id<GRXWriteable> responseWriteable =
234 [[GRXWriteable alloc] initWithValueHandler:valueHandler completionHandler:completionHandler];
235 [copiedCall startWithWriteable:responseWriteable];
239 GRPCCall *copiedCall = nil;
240 @synchronized(self) {
251 if ([_handler respondsToSelector:@selector(didCloseWithTrailingMetadata:error:)]) {
252 dispatch_async(_dispatchQueue, ^{
253 // Copy to local so that block is freed after cancellation completes.
254 id<GRPCResponseHandler> copiedHandler = nil;
255 @synchronized(self) {
256 copiedHandler = self->_handler;
257 self->_handler = nil;
260 [copiedHandler didCloseWithTrailingMetadata:nil
261 error:[NSError errorWithDomain:kGRPCErrorDomain
262 code:GRPCErrorCodeCancelled
264 NSLocalizedDescriptionKey :
275 - (void)writeData:(NSData *)data {
276 GRXBufferedPipe *copiedPipe = nil;
277 @synchronized(self) {
278 NSAssert(!_canceled, @"Call already canceled.");
279 NSAssert(!_finished, @"Call is half-closed before sending data.");
291 [copiedPipe writeValue:data];
295 GRXBufferedPipe *copiedPipe = nil;
296 @synchronized(self) {
297 NSAssert(_started, @"Call not started.");
298 NSAssert(!_canceled, @"Call already canceled.");
299 NSAssert(!_finished, @"Call already half-closed.");
316 [copiedPipe writesFinishedWithError:nil];
319 - (void)issueInitialMetadata:(NSDictionary *)initialMetadata {
320 @synchronized(self) {
321 if (initialMetadata != nil &&
322 [_handler respondsToSelector:@selector(didReceiveInitialMetadata:)]) {
323 dispatch_async(_dispatchQueue, ^{
324 id<GRPCResponseHandler> copiedHandler = nil;
325 @synchronized(self) {
326 copiedHandler = self->_handler;
328 [copiedHandler didReceiveInitialMetadata:initialMetadata];
334 - (void)issueMessage:(id)message {
335 @synchronized(self) {
336 if (message != nil && [_handler respondsToSelector:@selector(didReceiveRawMessage:)]) {
337 dispatch_async(_dispatchQueue, ^{
338 id<GRPCResponseHandler> copiedHandler = nil;
339 @synchronized(self) {
340 copiedHandler = self->_handler;
342 [copiedHandler didReceiveRawMessage:message];
348 - (void)issueClosedWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error {
349 @synchronized(self) {
350 if ([_handler respondsToSelector:@selector(didCloseWithTrailingMetadata:error:)]) {
351 dispatch_async(_dispatchQueue, ^{
352 id<GRPCResponseHandler> copiedHandler = nil;
353 @synchronized(self) {
354 copiedHandler = self->_handler;
355 // Clean up _handler so that no more responses are reported to the handler.
356 self->_handler = nil;
358 [copiedHandler didCloseWithTrailingMetadata:trailingMetadata error:error];
368 // The following methods of a C gRPC call object aren't reentrant, and thus
369 // calls to them must be serialized:
373 // start_batch with a SEND_MESSAGE argument can only be called after the
374 // OP_COMPLETE event for any previous write is received. This is achieved by
375 // pausing the requests writer immediately every time it writes a value, and
376 // resuming it again when OP_COMPLETE is received.
378 // Similarly, start_batch with a RECV_MESSAGE argument can only be called after
379 // the OP_COMPLETE event for any previous read is received.This is easier to
380 // enforce, as we're writing the received messages into the writeable:
381 // start_batch is enqueued once upon receiving the OP_COMPLETE event for the
382 // RECV_METADATA batch, and then once after receiving each OP_COMPLETE event for
383 // each RECV_MESSAGE batch.
384 @implementation GRPCCall {
385 dispatch_queue_t _callQueue;
389 GRPCCallSafety _callSafety;
390 GRPCCallOptions *_callOptions;
391 GRPCWrappedCall *_wrappedCall;
392 GRPCConnectivityMonitor *_connectivityMonitor;
394 // The C gRPC library has less guarantees on the ordering of events than we
395 // do. Particularly, in the face of errors, there's no ordering guarantee at
396 // all. This wrapper over our actual writeable ensures thread-safety and
398 GRXConcurrentWriteable *_responseWriteable;
400 // The network thread wants the requestWriter to resume (when the server is ready for more input),
401 // or to stop (on errors), concurrently with user threads that want to start it, pause it or stop
402 // it. Because a writer isn't thread-safe, we'll synchronize those operations on it.
403 // We don't use a dispatch queue for that purpose, because the writer can call writeValue: or
404 // writesFinishedWithError: on this GRPCCall as part of those operations. We want to be able to
405 // pause the writer immediately on writeValue:, so we need our locking to be recursive.
406 GRXWriter *_requestWriter;
408 // To create a retain cycle when a call is started, up until it finishes. See
409 // |startWithWriteable:| and |finishWithError:|. This saves users from having to retain a
410 // reference to the call object if all they're interested in is the handler being executed when
411 // the response arrives.
412 GRPCCall *_retainSelf;
414 GRPCRequestHeaders *_requestHeaders;
416 // In the case that the call is a unary call (i.e. the writer to GRPCCall is of type
417 // GRXImmediateSingleWriter), GRPCCall will delay sending ops (not send them to C core
418 // immediately) and buffer them into a batch _unaryOpBatch. The batch is sent to C core when
419 // the SendClose op is added.
421 NSMutableArray *_unaryOpBatch;
423 // The dispatch queue to be used for enqueuing responses to user. Defaulted to the main dispatch
425 dispatch_queue_t _responseQueue;
427 // The OAuth2 token fetched from a token provider.
428 NSString *_fetchedOauth2AccessToken;
431 @synthesize state = _state;
434 // Guarantees the code in {} block is invoked only once. See ref at:
435 // https://developer.apple.com/documentation/objectivec/nsobject/1418639-initialize?language=objc
436 if (self == [GRPCCall self]) {
438 callFlags = [NSMutableDictionary dictionary];
442 + (void)setCallSafety:(GRPCCallSafety)callSafety host:(NSString *)host path:(NSString *)path {
443 if (host.length == 0 || path.length == 0) {
446 NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
447 @synchronized(callFlags) {
448 switch (callSafety) {
449 case GRPCCallSafetyDefault:
450 callFlags[hostAndPath] = @0;
452 case GRPCCallSafetyIdempotentRequest:
453 callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
455 case GRPCCallSafetyCacheableRequest:
456 callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
464 + (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path {
465 NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
466 @synchronized(callFlags) {
467 return [callFlags[hostAndPath] intValue];
471 // Designated initializer
472 - (instancetype)initWithHost:(NSString *)host
473 path:(NSString *)path
474 requestsWriter:(GRXWriter *)requestWriter {
475 return [self initWithHost:host
477 callSafety:GRPCCallSafetyDefault
478 requestsWriter:requestWriter
482 - (instancetype)initWithHost:(NSString *)host
483 path:(NSString *)path
484 callSafety:(GRPCCallSafety)safety
485 requestsWriter:(GRXWriter *)requestWriter
486 callOptions:(GRPCCallOptions *)callOptions {
487 // Purposely using pointer rather than length (host.length == 0) for backwards compatibility.
488 NSAssert(host != nil && path != nil, @"Neither host nor path can be nil.");
489 NSAssert(safety <= GRPCCallSafetyCacheableRequest, @"Invalid call safety value.");
490 NSAssert(requestWriter.state == GRXWriterStateNotStarted,
491 @"The requests writer can't be already started.");
492 if (!host || !path) {
495 if (safety > GRPCCallSafetyCacheableRequest) {
498 if (requestWriter.state != GRXWriterStateNotStarted) {
502 if ((self = [super init])) {
505 _callSafety = safety;
506 _callOptions = [callOptions copy];
508 // Serial queue to invoke the non-reentrant methods of the grpc_call object.
509 _callQueue = dispatch_queue_create("io.grpc.call", DISPATCH_QUEUE_SERIAL);
511 _requestWriter = requestWriter;
513 _requestHeaders = [[GRPCRequestHeaders alloc] initWithCall:self];
515 if ([requestWriter isKindOfClass:[GRXImmediateSingleWriter class]]) {
517 _unaryOpBatch = [NSMutableArray arrayWithCapacity:kMaxClientBatch];
520 _responseQueue = dispatch_get_main_queue();
525 - (void)setResponseDispatchQueue:(dispatch_queue_t)queue {
526 @synchronized(self) {
527 if (_state != GRXWriterStateNotStarted) {
530 _responseQueue = queue;
536 // This function should support being called within a @synchronized(self) block in another function
537 // Should not manipulate _requestWriter for deadlock prevention.
538 - (void)finishWithError:(NSError *)errorOrNil {
539 @synchronized(self) {
540 if (_state == GRXWriterStateFinished) {
543 _state = GRXWriterStateFinished;
546 [_responseWriteable cancelWithError:errorOrNil];
548 [_responseWriteable enqueueSuccessfulCompletion];
551 // If the call isn't retained anywhere else, it can be deallocated now.
557 @synchronized(self) {
558 if (_state == GRXWriterStateFinished) {
561 [self finishWithError:[NSError
562 errorWithDomain:kGRPCErrorDomain
563 code:GRPCErrorCodeCancelled
564 userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]];
565 [_wrappedCall cancel];
567 _requestWriter.state = GRXWriterStateFinished;
571 __block GRPCWrappedCall *wrappedCall = _wrappedCall;
572 dispatch_async(_callQueue, ^{
577 #pragma mark Read messages
579 // Only called from the call queue.
580 // The handler will be called from the network queue.
581 - (void)startReadWithHandler:(void (^)(grpc_byte_buffer *))handler {
582 // TODO(jcanizales): Add error handlers for async failures
583 [_wrappedCall startBatchWithOperations:@[ [[GRPCOpRecvMessage alloc] initWithHandler:handler] ]];
586 // Called initially from the network queue once response headers are received,
587 // then "recursively" from the responseWriteable queue after each response from the
588 // server has been written.
589 // If the call is currently paused, this is a noop. Restarting the call will invoke this
591 // TODO(jcanizales): Rename to readResponseIfNotPaused.
592 - (void)startNextRead {
593 @synchronized(self) {
594 if (_state != GRXWriterStateStarted) {
599 dispatch_async(_callQueue, ^{
600 __weak GRPCCall *weakSelf = self;
601 [self startReadWithHandler:^(grpc_byte_buffer *message) {
602 if (message == NULL) {
603 // No more messages from the server
606 __strong GRPCCall *strongSelf = weakSelf;
607 if (strongSelf == nil) {
608 grpc_byte_buffer_destroy(message);
611 NSData *data = [NSData grpc_dataWithByteBuffer:message];
612 grpc_byte_buffer_destroy(message);
614 // The app doesn't have enough memory to hold the server response. We
615 // don't want to throw, because the app shouldn't crash for a behavior
616 // that's on the hands of any server to have. Instead we finish and ask
617 // the server to cancel.
618 @synchronized(strongSelf) {
620 finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
621 code:GRPCErrorCodeResourceExhausted
623 NSLocalizedDescriptionKey :
624 @"Client does not have enough memory to "
625 @"hold the server response."
627 [strongSelf->_wrappedCall cancel];
629 strongSelf->_requestWriter.state = GRXWriterStateFinished;
631 @synchronized(strongSelf) {
632 [strongSelf->_responseWriteable enqueueValue:data
634 [strongSelf startNextRead];
642 #pragma mark Send headers
644 - (void)sendHeaders {
645 // TODO (mxyan): Remove after deprecated methods are removed
646 uint32_t callSafetyFlags = 0;
647 switch (_callSafety) {
648 case GRPCCallSafetyDefault:
651 case GRPCCallSafetyIdempotentRequest:
652 callSafetyFlags = GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
654 case GRPCCallSafetyCacheableRequest:
655 callSafetyFlags = GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
659 NSMutableDictionary *headers = [_requestHeaders mutableCopy];
660 NSString *fetchedOauth2AccessToken;
661 @synchronized(self) {
662 fetchedOauth2AccessToken = _fetchedOauth2AccessToken;
664 if (fetchedOauth2AccessToken != nil) {
665 headers[@"authorization"] = [kBearerPrefix stringByAppendingString:fetchedOauth2AccessToken];
666 } else if (_callOptions.oauth2AccessToken != nil) {
667 headers[@"authorization"] =
668 [kBearerPrefix stringByAppendingString:_callOptions.oauth2AccessToken];
671 // TODO(jcanizales): Add error handlers for async failures
672 GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc]
673 initWithMetadata:headers
674 flags:callSafetyFlags
675 handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA
676 dispatch_async(_callQueue, ^{
677 if (!self->_unaryCall) {
678 [self->_wrappedCall startBatchWithOperations:@[ op ]];
680 [self->_unaryOpBatch addObject:op];
685 #pragma mark GRXWriteable implementation
687 // Only called from the call queue. The error handler will be called from the
688 // network queue if the write didn't succeed.
689 // If the call is a unary call, parameter \a errorHandler will be ignored and
690 // the error handler of GRPCOpSendClose will be executed in case of error.
691 - (void)writeMessage:(NSData *)message withErrorHandler:(void (^)(void))errorHandler {
692 __weak GRPCCall *weakSelf = self;
693 void (^resumingHandler)(void) = ^{
694 // Resume the request writer.
695 GRPCCall *strongSelf = weakSelf;
697 strongSelf->_requestWriter.state = GRXWriterStateStarted;
701 GRPCOpSendMessage *op =
702 [[GRPCOpSendMessage alloc] initWithMessage:message handler:resumingHandler];
704 [_wrappedCall startBatchWithOperations:@[ op ] errorHandler:errorHandler];
706 // Ignored errorHandler since it is the same as the one for GRPCOpSendClose.
707 // TODO (mxyan): unify the error handlers of all Ops into a single closure.
708 [_unaryOpBatch addObject:op];
712 - (void)writeValue:(id)value {
713 NSAssert([value isKindOfClass:[NSData class]], @"value must be of type NSData");
715 @synchronized(self) {
716 if (_state == GRXWriterStateFinished) {
721 // Pause the input and only resume it when the C layer notifies us that writes
723 _requestWriter.state = GRXWriterStatePaused;
725 dispatch_async(_callQueue, ^{
726 // Write error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT
727 [self writeMessage:value withErrorHandler:nil];
731 // Only called from the call queue. The error handler will be called from the
732 // network queue if the requests stream couldn't be closed successfully.
733 - (void)finishRequestWithErrorHandler:(void (^)(void))errorHandler {
735 [_wrappedCall startBatchWithOperations:@[ [[GRPCOpSendClose alloc] init] ]
736 errorHandler:errorHandler];
738 [_unaryOpBatch addObject:[[GRPCOpSendClose alloc] init]];
739 [_wrappedCall startBatchWithOperations:_unaryOpBatch errorHandler:errorHandler];
743 - (void)writesFinishedWithError:(NSError *)errorOrNil {
747 dispatch_async(_callQueue, ^{
748 // EOS error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT
749 [self finishRequestWithErrorHandler:nil];
756 // Both handlers will eventually be called, from the network queue. Writes can start immediately
758 // The first one (headersHandler), when the response headers are received.
759 // The second one (completionHandler), whenever the RPC finishes for any reason.
760 - (void)invokeCallWithHeadersHandler:(void (^)(NSDictionary *))headersHandler
761 completionHandler:(void (^)(NSError *, NSDictionary *))completionHandler {
762 dispatch_async(_callQueue, ^{
763 // TODO(jcanizales): Add error handlers for async failures
765 startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]];
767 startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]];
772 __weak GRPCCall *weakSelf = self;
773 [self invokeCallWithHeadersHandler:^(NSDictionary *headers) {
774 // Response headers received.
775 __strong GRPCCall *strongSelf = weakSelf;
777 strongSelf.responseHeaders = headers;
778 [strongSelf startNextRead];
781 completionHandler:^(NSError *error, NSDictionary *trailers) {
782 __strong GRPCCall *strongSelf = weakSelf;
784 strongSelf.responseTrailers = trailers;
787 NSMutableDictionary *userInfo = [NSMutableDictionary dictionary];
788 if (error.userInfo) {
789 [userInfo addEntriesFromDictionary:error.userInfo];
791 userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers;
792 // Since gRPC core does not guarantee the headers block being called before this block,
793 // responseHeaders might be nil.
794 userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders;
795 error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo];
797 [strongSelf finishWithError:error];
798 strongSelf->_requestWriter.state = GRXWriterStateFinished;
803 #pragma mark GRXWriter implementation
805 // Lock acquired inside startWithWriteable:
806 - (void)startCallWithWriteable:(id<GRXWriteable>)writeable {
807 @synchronized(self) {
808 if (_state == GRXWriterStateFinished) {
813 [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue];
815 GRPCPooledChannel *channel =
816 [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions];
817 _wrappedCall = [channel wrappedCallWithPath:_path
818 completionQueue:[GRPCCompletionQueue completionQueue]
819 callOptions:_callOptions];
821 if (_wrappedCall == nil) {
822 [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
823 code:GRPCErrorCodeUnavailable
825 NSLocalizedDescriptionKey :
826 @"Failed to create call or channel."
834 // Connectivity monitor is not required for CFStream
835 char *enableCFStream = getenv(kCFStreamVarName);
836 if (enableCFStream == nil || enableCFStream[0] != '1') {
837 [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)];
841 // Now that the RPC has been initiated, request writes can start.
842 [_requestWriter startWithWriteable:self];
845 - (void)startWithWriteable:(id<GRXWriteable>)writeable {
846 id<GRPCAuthorizationProtocol> tokenProvider = nil;
847 @synchronized(self) {
848 _state = GRXWriterStateStarted;
850 // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled).
851 // This makes RPCs in which the call isn't externally retained possible (as long as it is
852 // started before being autoreleased). Care is taken not to retain self strongly in any of the
853 // blocks used in this implementation, so that the life of the instance is determined by this
857 if (_callOptions == nil) {
858 GRPCMutableCallOptions *callOptions = [[GRPCHost callOptionsForHost:_host] mutableCopy];
859 if (_serverName.length != 0) {
860 callOptions.serverAuthority = _serverName;
863 callOptions.timeout = _timeout;
865 uint32_t callFlags = [GRPCCall callFlagsForHost:_host path:_path];
866 if (callFlags != 0) {
867 if (callFlags == GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
868 _callSafety = GRPCCallSafetyIdempotentRequest;
869 } else if (callFlags == GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) {
870 _callSafety = GRPCCallSafetyCacheableRequest;
874 id<GRPCAuthorizationProtocol> tokenProvider = self.tokenProvider;
875 if (tokenProvider != nil) {
876 callOptions.authTokenProvider = tokenProvider;
878 _callOptions = callOptions;
881 NSAssert(_callOptions.authTokenProvider == nil || _callOptions.oauth2AccessToken == nil,
882 @"authTokenProvider and oauth2AccessToken cannot be set at the same time");
884 tokenProvider = _callOptions.authTokenProvider;
887 if (tokenProvider != nil) {
888 __weak typeof(self) weakSelf = self;
889 [tokenProvider getTokenWithHandler:^(NSString *token) {
890 __strong typeof(self) strongSelf = weakSelf;
893 @synchronized(strongSelf) {
894 if (strongSelf->_state != GRXWriterStateFinished) {
897 strongSelf->_fetchedOauth2AccessToken = [token copy];
902 [strongSelf startCallWithWriteable:writeable];
907 [self startCallWithWriteable:writeable];
911 - (void)setState:(GRXWriterState)newState {
912 @synchronized(self) {
913 // Manual transitions are only allowed from the started or paused states.
914 if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
919 case GRXWriterStateFinished:
921 // Per GRXWriter's contract, setting the state to Finished manually
922 // means one doesn't wish the writeable to be messaged anymore.
923 [_responseWriteable cancelSilently];
924 _responseWriteable = nil;
926 case GRXWriterStatePaused:
929 case GRXWriterStateStarted:
930 if (_state == GRXWriterStatePaused) {
932 [self startNextRead];
935 case GRXWriterStateNotStarted:
941 - (void)connectivityChanged:(NSNotification *)note {
942 // Cancel underlying call upon this notification.
944 // Retain because connectivity manager only keeps weak reference to GRPCCall.
945 __strong GRPCCall *strongSelf = self;
947 @synchronized(strongSelf) {
948 [_wrappedCall cancel];
950 finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
951 code:GRPCErrorCodeUnavailable
953 NSLocalizedDescriptionKey : @"Connectivity lost."
956 strongSelf->_requestWriter.state = GRXWriterStateFinished;