3 * Copyright 2015 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
20 #import "GRPCCall+OAuth2.h"
21 #import "GRPCCallOptions.h"
22 #import "GRPCInterceptor.h"
24 #import <RxLibrary/GRXBufferedPipe.h>
25 #import <RxLibrary/GRXConcurrentWriteable.h>
26 #import <RxLibrary/GRXImmediateSingleWriter.h>
27 #import <RxLibrary/GRXWriter+Immediate.h>
28 #include <grpc/grpc.h>
29 #include <grpc/support/time.h>
31 #import "private/GRPCCall+V2API.h"
32 #import "private/GRPCCallInternal.h"
33 #import "private/GRPCChannelPool.h"
34 #import "private/GRPCCompletionQueue.h"
35 #import "private/GRPCConnectivityMonitor.h"
36 #import "private/GRPCHost.h"
37 #import "private/GRPCRequestHeaders.h"
38 #import "private/GRPCWrappedCall.h"
39 #import "private/NSData+GRPC.h"
40 #import "private/NSDictionary+GRPC.h"
41 #import "private/NSError+GRPC.h"
43 // At most 6 ops can be in an op batch for a client: SEND_INITIAL_METADATA,
44 // SEND_MESSAGE, SEND_CLOSE_FROM_CLIENT, RECV_INITIAL_METADATA, RECV_MESSAGE,
45 // and RECV_STATUS_ON_CLIENT.
46 NSInteger kMaxClientBatch = 6;
48 NSString *const kGRPCHeadersKey = @"io.grpc.HeadersKey";
49 NSString *const kGRPCTrailersKey = @"io.grpc.TrailersKey";
50 static NSMutableDictionary *callFlags;
52 static NSString *const kAuthorizationHeader = @"authorization";
53 static NSString *const kBearerPrefix = @"Bearer ";
55 const char *kCFStreamVarName = "grpc_cfstream";
57 @interface GRPCCall ()<GRXWriteable>
58 // Make them read-write.
59 @property(atomic, strong) NSDictionary *responseHeaders;
60 @property(atomic, strong) NSDictionary *responseTrailers;
62 - (void)receiveNextMessages:(NSUInteger)numberOfMessages;
64 - (instancetype)initWithHost:(NSString *)host
66 callSafety:(GRPCCallSafety)safety
67 requestsWriter:(GRXWriter *)requestsWriter
68 callOptions:(GRPCCallOptions *)callOptions
69 writeDone:(void (^)(void))writeDone;
73 @implementation GRPCRequestOptions
75 - (instancetype)initWithHost:(NSString *)host path:(NSString *)path safety:(GRPCCallSafety)safety {
76 NSAssert(host.length != 0 && path.length != 0, @"host and path cannot be empty");
77 if (host.length == 0 || path.length == 0) {
80 if ((self = [super init])) {
88 - (id)copyWithZone:(NSZone *)zone {
89 GRPCRequestOptions *request =
90 [[GRPCRequestOptions alloc] initWithHost:_host path:_path safety:_safety];
98 * This class acts as a wrapper for interceptors
100 @implementation GRPCCall2 {
101 /** The handler of responses. */
102 id<GRPCResponseHandler> _responseHandler;
105 * Points to the first interceptor in the interceptor chain.
107 id<GRPCInterceptorInterface> _firstInterceptor;
110 * The actual call options being used by this call. It is different from the user-provided
111 * call options when the user provided a NULL call options object.
113 GRPCCallOptions *_actualCallOptions;
116 - (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions
117 responseHandler:(id<GRPCResponseHandler>)responseHandler
118 callOptions:(GRPCCallOptions *)callOptions {
119 NSAssert(requestOptions.host.length != 0 && requestOptions.path.length != 0,
120 @"Neither host nor path can be nil.");
121 NSAssert(requestOptions.safety <= GRPCCallSafetyCacheableRequest, @"Invalid call safety value.");
122 NSAssert(responseHandler != nil, @"Response handler required.");
123 if (requestOptions.host.length == 0 || requestOptions.path.length == 0) {
126 if (requestOptions.safety > GRPCCallSafetyCacheableRequest) {
129 if (responseHandler == nil) {
133 if ((self = [super init])) {
134 _requestOptions = [requestOptions copy];
135 _callOptions = [callOptions copy];
137 _actualCallOptions = [[GRPCCallOptions alloc] init];
139 _actualCallOptions = [callOptions copy];
141 _responseHandler = responseHandler;
143 // Initialize the interceptor chain
144 GRPCCall2Internal *internalCall = [[GRPCCall2Internal alloc] init];
145 id<GRPCInterceptorInterface> nextInterceptor = internalCall;
146 GRPCInterceptorManager *nextManager = nil;
147 NSArray *interceptorFactories = _actualCallOptions.interceptorFactories;
148 if (interceptorFactories.count == 0) {
149 [internalCall setResponseHandler:_responseHandler];
151 for (int i = (int)interceptorFactories.count - 1; i >= 0; i--) {
152 GRPCInterceptorManager *manager =
153 [[GRPCInterceptorManager alloc] initWithNextInterceptor:nextInterceptor];
154 GRPCInterceptor *interceptor =
155 [interceptorFactories[i] createInterceptorWithManager:manager];
156 NSAssert(interceptor != nil, @"Failed to create interceptor");
157 if (interceptor == nil) {
160 if (i == (int)interceptorFactories.count - 1) {
161 [internalCall setResponseHandler:interceptor];
163 [nextManager setPreviousInterceptor:interceptor];
165 nextInterceptor = interceptor;
166 nextManager = manager;
169 [nextManager setPreviousInterceptor:_responseHandler];
171 _firstInterceptor = nextInterceptor;
177 - (instancetype)initWithRequestOptions:(GRPCRequestOptions *)requestOptions
178 responseHandler:(id<GRPCResponseHandler>)responseHandler {
180 [self initWithRequestOptions:requestOptions responseHandler:responseHandler callOptions:nil];
184 id<GRPCInterceptorInterface> copiedFirstInterceptor;
185 @synchronized(self) {
186 copiedFirstInterceptor = _firstInterceptor;
188 GRPCRequestOptions *requestOptions = [_requestOptions copy];
189 GRPCCallOptions *callOptions = [_actualCallOptions copy];
190 if ([copiedFirstInterceptor respondsToSelector:@selector(startWithRequestOptions:callOptions:)]) {
191 dispatch_async(copiedFirstInterceptor.requestDispatchQueue, ^{
192 [copiedFirstInterceptor startWithRequestOptions:requestOptions callOptions:callOptions];
198 id<GRPCInterceptorInterface> copiedFirstInterceptor;
199 @synchronized(self) {
200 copiedFirstInterceptor = _firstInterceptor;
202 if ([copiedFirstInterceptor respondsToSelector:@selector(cancel)]) {
203 dispatch_async(copiedFirstInterceptor.requestDispatchQueue, ^{
204 [copiedFirstInterceptor cancel];
209 - (void)writeData:(id)data {
210 id<GRPCInterceptorInterface> copiedFirstInterceptor;
211 @synchronized(self) {
212 copiedFirstInterceptor = _firstInterceptor;
214 if ([copiedFirstInterceptor respondsToSelector:@selector(writeData:)]) {
215 dispatch_async(copiedFirstInterceptor.requestDispatchQueue, ^{
216 [copiedFirstInterceptor writeData:data];
222 id<GRPCInterceptorInterface> copiedFirstInterceptor;
223 @synchronized(self) {
224 copiedFirstInterceptor = _firstInterceptor;
226 if ([copiedFirstInterceptor respondsToSelector:@selector(finish)]) {
227 dispatch_async(copiedFirstInterceptor.requestDispatchQueue, ^{
228 [copiedFirstInterceptor finish];
233 - (void)receiveNextMessages:(NSUInteger)numberOfMessages {
234 id<GRPCInterceptorInterface> copiedFirstInterceptor;
235 @synchronized(self) {
236 copiedFirstInterceptor = _firstInterceptor;
238 if ([copiedFirstInterceptor respondsToSelector:@selector(receiveNextMessages:)]) {
239 dispatch_async(copiedFirstInterceptor.requestDispatchQueue, ^{
240 [copiedFirstInterceptor receiveNextMessages:numberOfMessages];
247 // The following methods of a C gRPC call object aren't reentrant, and thus
248 // calls to them must be serialized:
252 // start_batch with a SEND_MESSAGE argument can only be called after the
253 // OP_COMPLETE event for any previous write is received. This is achieved by
254 // pausing the requests writer immediately every time it writes a value, and
255 // resuming it again when OP_COMPLETE is received.
257 // Similarly, start_batch with a RECV_MESSAGE argument can only be called after
258 // the OP_COMPLETE event for any previous read is received.This is easier to
259 // enforce, as we're writing the received messages into the writeable:
260 // start_batch is enqueued once upon receiving the OP_COMPLETE event for the
261 // RECV_METADATA batch, and then once after receiving each OP_COMPLETE event for
262 // each RECV_MESSAGE batch.
263 @implementation GRPCCall {
264 dispatch_queue_t _callQueue;
268 GRPCCallSafety _callSafety;
269 GRPCCallOptions *_callOptions;
270 GRPCWrappedCall *_wrappedCall;
271 GRPCConnectivityMonitor *_connectivityMonitor;
273 // The C gRPC library has less guarantees on the ordering of events than we
274 // do. Particularly, in the face of errors, there's no ordering guarantee at
275 // all. This wrapper over our actual writeable ensures thread-safety and
277 GRXConcurrentWriteable *_responseWriteable;
279 // The network thread wants the requestWriter to resume (when the server is ready for more input),
280 // or to stop (on errors), concurrently with user threads that want to start it, pause it or stop
281 // it. Because a writer isn't thread-safe, we'll synchronize those operations on it.
282 // We don't use a dispatch queue for that purpose, because the writer can call writeValue: or
283 // writesFinishedWithError: on this GRPCCall as part of those operations. We want to be able to
284 // pause the writer immediately on writeValue:, so we need our locking to be recursive.
285 GRXWriter *_requestWriter;
287 // To create a retain cycle when a call is started, up until it finishes. See
288 // |startWithWriteable:| and |finishWithError:|. This saves users from having to retain a
289 // reference to the call object if all they're interested in is the handler being executed when
290 // the response arrives.
291 GRPCCall *_retainSelf;
293 GRPCRequestHeaders *_requestHeaders;
295 // In the case that the call is a unary call (i.e. the writer to GRPCCall is of type
296 // GRXImmediateSingleWriter), GRPCCall will delay sending ops (not send them to C core
297 // immediately) and buffer them into a batch _unaryOpBatch. The batch is sent to C core when
298 // the SendClose op is added.
300 NSMutableArray *_unaryOpBatch;
302 // The dispatch queue to be used for enqueuing responses to user. Defaulted to the main dispatch
304 dispatch_queue_t _responseQueue;
306 // The OAuth2 token fetched from a token provider.
307 NSString *_fetchedOauth2AccessToken;
309 // The callback to be called when a write message op is done.
310 void (^_writeDone)(void);
312 // Indicate a read request to core is pending.
313 BOOL _pendingCoreRead;
315 // Indicate pending read message request from user.
316 NSUInteger _pendingReceiveNextMessages;
319 @synthesize state = _state;
322 // Guarantees the code in {} block is invoked only once. See ref at:
323 // https://developer.apple.com/documentation/objectivec/nsobject/1418639-initialize?language=objc
324 if (self == [GRPCCall self]) {
325 // Enable CFStream by default by do not overwrite if the user explicitly disables CFStream with
326 // environment variable "grpc_cfstream=0"
327 setenv(kCFStreamVarName, "1", 0);
329 callFlags = [NSMutableDictionary dictionary];
333 + (void)setCallSafety:(GRPCCallSafety)callSafety host:(NSString *)host path:(NSString *)path {
334 if (host.length == 0 || path.length == 0) {
337 NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
338 @synchronized(callFlags) {
339 switch (callSafety) {
340 case GRPCCallSafetyDefault:
341 callFlags[hostAndPath] = @0;
343 case GRPCCallSafetyIdempotentRequest:
344 callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
346 case GRPCCallSafetyCacheableRequest:
347 callFlags[hostAndPath] = @GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
355 + (uint32_t)callFlagsForHost:(NSString *)host path:(NSString *)path {
356 NSString *hostAndPath = [NSString stringWithFormat:@"%@/%@", host, path];
357 @synchronized(callFlags) {
358 return [callFlags[hostAndPath] intValue];
362 // Designated initializer
363 - (instancetype)initWithHost:(NSString *)host
364 path:(NSString *)path
365 requestsWriter:(GRXWriter *)requestWriter {
366 return [self initWithHost:host
368 callSafety:GRPCCallSafetyDefault
369 requestsWriter:requestWriter
373 - (instancetype)initWithHost:(NSString *)host
374 path:(NSString *)path
375 callSafety:(GRPCCallSafety)safety
376 requestsWriter:(GRXWriter *)requestsWriter
377 callOptions:(GRPCCallOptions *)callOptions {
378 return [self initWithHost:host
381 requestsWriter:requestsWriter
382 callOptions:callOptions
386 - (instancetype)initWithHost:(NSString *)host
387 path:(NSString *)path
388 callSafety:(GRPCCallSafety)safety
389 requestsWriter:(GRXWriter *)requestsWriter
390 callOptions:(GRPCCallOptions *)callOptions
391 writeDone:(void (^)(void))writeDone {
392 // Purposely using pointer rather than length (host.length == 0) for backwards compatibility.
393 NSAssert(host != nil && path != nil, @"Neither host nor path can be nil.");
394 NSAssert(safety <= GRPCCallSafetyCacheableRequest, @"Invalid call safety value.");
395 NSAssert(requestsWriter.state == GRXWriterStateNotStarted,
396 @"The requests writer can't be already started.");
397 if (!host || !path) {
400 if (safety > GRPCCallSafetyCacheableRequest) {
403 if (requestsWriter.state != GRXWriterStateNotStarted) {
407 if ((self = [super init])) {
410 _callSafety = safety;
411 _callOptions = [callOptions copy];
413 // Serial queue to invoke the non-reentrant methods of the grpc_call object.
414 _callQueue = dispatch_queue_create("io.grpc.call", DISPATCH_QUEUE_SERIAL);
416 _requestWriter = requestsWriter;
417 _requestHeaders = [[GRPCRequestHeaders alloc] initWithCall:self];
418 _writeDone = writeDone;
420 if ([requestsWriter isKindOfClass:[GRXImmediateSingleWriter class]]) {
422 _unaryOpBatch = [NSMutableArray arrayWithCapacity:kMaxClientBatch];
425 _responseQueue = dispatch_get_main_queue();
427 // do not start a read until initial metadata is received
428 _pendingReceiveNextMessages = 0;
429 _pendingCoreRead = YES;
434 - (void)setResponseDispatchQueue:(dispatch_queue_t)queue {
435 @synchronized(self) {
436 if (_state != GRXWriterStateNotStarted) {
439 _responseQueue = queue;
445 // This function should support being called within a @synchronized(self) block in another function
446 // Should not manipulate _requestWriter for deadlock prevention.
447 - (void)finishWithError:(NSError *)errorOrNil {
448 @synchronized(self) {
449 if (_state == GRXWriterStateFinished) {
452 _state = GRXWriterStateFinished;
455 [_responseWriteable cancelWithError:errorOrNil];
457 [_responseWriteable enqueueSuccessfulCompletion];
460 // If the call isn't retained anywhere else, it can be deallocated now.
466 @synchronized(self) {
467 if (_state == GRXWriterStateFinished) {
470 [self finishWithError:[NSError
471 errorWithDomain:kGRPCErrorDomain
472 code:GRPCErrorCodeCancelled
473 userInfo:@{NSLocalizedDescriptionKey : @"Canceled by app"}]];
474 [_wrappedCall cancel];
476 _requestWriter.state = GRXWriterStateFinished;
480 [GRPCConnectivityMonitor unregisterObserver:self];
482 __block GRPCWrappedCall *wrappedCall = _wrappedCall;
483 dispatch_async(_callQueue, ^{
488 #pragma mark Read messages
490 // Only called from the call queue.
491 // The handler will be called from the network queue.
492 - (void)startReadWithHandler:(void (^)(grpc_byte_buffer *))handler {
493 // TODO(jcanizales): Add error handlers for async failures
494 [_wrappedCall startBatchWithOperations:@[ [[GRPCOpRecvMessage alloc] initWithHandler:handler] ]];
497 // Called initially from the network queue once response headers are received,
498 // then "recursively" from the responseWriteable queue after each response from the
499 // server has been written.
500 // If the call is currently paused, this is a noop. Restarting the call will invoke this
502 // TODO(jcanizales): Rename to readResponseIfNotPaused.
503 - (void)maybeStartNextRead {
504 @synchronized(self) {
505 if (_state != GRXWriterStateStarted) {
508 if (_callOptions.flowControlEnabled && (_pendingCoreRead || _pendingReceiveNextMessages == 0)) {
511 _pendingCoreRead = YES;
512 _pendingReceiveNextMessages--;
515 dispatch_async(_callQueue, ^{
516 __weak GRPCCall *weakSelf = self;
517 [self startReadWithHandler:^(grpc_byte_buffer *message) {
518 if (message == NULL) {
519 // No more messages from the server
522 __strong GRPCCall *strongSelf = weakSelf;
523 if (strongSelf == nil) {
524 grpc_byte_buffer_destroy(message);
527 NSData *data = [NSData grpc_dataWithByteBuffer:message];
528 grpc_byte_buffer_destroy(message);
530 // The app doesn't have enough memory to hold the server response. We
531 // don't want to throw, because the app shouldn't crash for a behavior
532 // that's on the hands of any server to have. Instead we finish and ask
533 // the server to cancel.
534 @synchronized(strongSelf) {
535 strongSelf->_pendingCoreRead = NO;
537 finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
538 code:GRPCErrorCodeResourceExhausted
540 NSLocalizedDescriptionKey :
541 @"Client does not have enough memory to "
542 @"hold the server response."
544 [strongSelf->_wrappedCall cancel];
546 strongSelf->_requestWriter.state = GRXWriterStateFinished;
548 @synchronized(strongSelf) {
549 [strongSelf->_responseWriteable enqueueValue:data
551 __strong GRPCCall *strongSelf = weakSelf;
553 @synchronized(strongSelf) {
554 strongSelf->_pendingCoreRead = NO;
555 [strongSelf maybeStartNextRead];
565 #pragma mark Send headers
567 - (void)sendHeaders {
568 // TODO (mxyan): Remove after deprecated methods are removed
569 uint32_t callSafetyFlags = 0;
570 switch (_callSafety) {
571 case GRPCCallSafetyDefault:
574 case GRPCCallSafetyIdempotentRequest:
575 callSafetyFlags = GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST;
577 case GRPCCallSafetyCacheableRequest:
578 callSafetyFlags = GRPC_INITIAL_METADATA_CACHEABLE_REQUEST;
582 NSMutableDictionary *headers = [_requestHeaders mutableCopy];
583 NSString *fetchedOauth2AccessToken;
584 @synchronized(self) {
585 fetchedOauth2AccessToken = _fetchedOauth2AccessToken;
587 if (fetchedOauth2AccessToken != nil) {
588 headers[@"authorization"] = [kBearerPrefix stringByAppendingString:fetchedOauth2AccessToken];
589 } else if (_callOptions.oauth2AccessToken != nil) {
590 headers[@"authorization"] =
591 [kBearerPrefix stringByAppendingString:_callOptions.oauth2AccessToken];
594 // TODO(jcanizales): Add error handlers for async failures
595 GRPCOpSendMetadata *op = [[GRPCOpSendMetadata alloc]
596 initWithMetadata:headers
597 flags:callSafetyFlags
598 handler:nil]; // No clean-up needed after SEND_INITIAL_METADATA
599 dispatch_async(_callQueue, ^{
600 if (!self->_unaryCall) {
601 [self->_wrappedCall startBatchWithOperations:@[ op ]];
603 [self->_unaryOpBatch addObject:op];
608 - (void)receiveNextMessages:(NSUInteger)numberOfMessages {
609 if (numberOfMessages == 0) {
612 @synchronized(self) {
613 _pendingReceiveNextMessages += numberOfMessages;
615 if (_state != GRXWriterStateStarted || !_callOptions.flowControlEnabled) {
618 [self maybeStartNextRead];
622 #pragma mark GRXWriteable implementation
624 // Only called from the call queue. The error handler will be called from the
625 // network queue if the write didn't succeed.
626 // If the call is a unary call, parameter \a errorHandler will be ignored and
627 // the error handler of GRPCOpSendClose will be executed in case of error.
628 - (void)writeMessage:(NSData *)message withErrorHandler:(void (^)(void))errorHandler {
629 __weak GRPCCall *weakSelf = self;
630 void (^resumingHandler)(void) = ^{
631 // Resume the request writer.
632 GRPCCall *strongSelf = weakSelf;
634 strongSelf->_requestWriter.state = GRXWriterStateStarted;
635 if (strongSelf->_writeDone) {
636 strongSelf->_writeDone();
640 GRPCOpSendMessage *op =
641 [[GRPCOpSendMessage alloc] initWithMessage:message handler:resumingHandler];
643 [_wrappedCall startBatchWithOperations:@[ op ] errorHandler:errorHandler];
645 // Ignored errorHandler since it is the same as the one for GRPCOpSendClose.
646 // TODO (mxyan): unify the error handlers of all Ops into a single closure.
647 [_unaryOpBatch addObject:op];
651 - (void)writeValue:(id)value {
652 NSAssert([value isKindOfClass:[NSData class]], @"value must be of type NSData");
654 @synchronized(self) {
655 if (_state == GRXWriterStateFinished) {
660 // Pause the input and only resume it when the C layer notifies us that writes
662 _requestWriter.state = GRXWriterStatePaused;
664 dispatch_async(_callQueue, ^{
665 // Write error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT
666 [self writeMessage:value withErrorHandler:nil];
670 // Only called from the call queue. The error handler will be called from the
671 // network queue if the requests stream couldn't be closed successfully.
672 - (void)finishRequestWithErrorHandler:(void (^)(void))errorHandler {
674 [_wrappedCall startBatchWithOperations:@[ [[GRPCOpSendClose alloc] init] ]
675 errorHandler:errorHandler];
677 [_unaryOpBatch addObject:[[GRPCOpSendClose alloc] init]];
678 [_wrappedCall startBatchWithOperations:_unaryOpBatch errorHandler:errorHandler];
682 - (void)writesFinishedWithError:(NSError *)errorOrNil {
686 dispatch_async(_callQueue, ^{
687 // EOS error is not processed here. It is handled by op batch of GRPC_OP_RECV_STATUS_ON_CLIENT
688 [self finishRequestWithErrorHandler:nil];
695 // Both handlers will eventually be called, from the network queue. Writes can start immediately
697 // The first one (headersHandler), when the response headers are received.
698 // The second one (completionHandler), whenever the RPC finishes for any reason.
699 - (void)invokeCallWithHeadersHandler:(void (^)(NSDictionary *))headersHandler
700 completionHandler:(void (^)(NSError *, NSDictionary *))completionHandler {
701 dispatch_async(_callQueue, ^{
702 // TODO(jcanizales): Add error handlers for async failures
704 startBatchWithOperations:@[ [[GRPCOpRecvMetadata alloc] initWithHandler:headersHandler] ]];
706 startBatchWithOperations:@[ [[GRPCOpRecvStatus alloc] initWithHandler:completionHandler] ]];
711 __weak GRPCCall *weakSelf = self;
712 [self invokeCallWithHeadersHandler:^(NSDictionary *headers) {
713 // Response headers received.
714 __strong GRPCCall *strongSelf = weakSelf;
716 @synchronized(strongSelf) {
717 // it is ok to set nil because headers are only received once
718 strongSelf.responseHeaders = nil;
719 // copy the header so that the GRPCOpRecvMetadata object may be dealloc'ed
720 NSDictionary *copiedHeaders =
721 [[NSDictionary alloc] initWithDictionary:headers copyItems:YES];
722 strongSelf.responseHeaders = copiedHeaders;
723 strongSelf->_pendingCoreRead = NO;
724 [strongSelf maybeStartNextRead];
728 completionHandler:^(NSError *error, NSDictionary *trailers) {
729 __strong GRPCCall *strongSelf = weakSelf;
731 strongSelf.responseTrailers = trailers;
734 NSMutableDictionary *userInfo = [NSMutableDictionary dictionary];
735 if (error.userInfo) {
736 [userInfo addEntriesFromDictionary:error.userInfo];
738 userInfo[kGRPCTrailersKey] = strongSelf.responseTrailers;
739 // Since gRPC core does not guarantee the headers block being called before this block,
740 // responseHeaders might be nil.
741 userInfo[kGRPCHeadersKey] = strongSelf.responseHeaders;
742 error = [NSError errorWithDomain:error.domain code:error.code userInfo:userInfo];
744 [strongSelf finishWithError:error];
745 strongSelf->_requestWriter.state = GRXWriterStateFinished;
750 #pragma mark GRXWriter implementation
752 // Lock acquired inside startWithWriteable:
753 - (void)startCallWithWriteable:(id<GRXWriteable>)writeable {
754 @synchronized(self) {
755 if (_state == GRXWriterStateFinished) {
760 [[GRXConcurrentWriteable alloc] initWithWriteable:writeable dispatchQueue:_responseQueue];
762 GRPCPooledChannel *channel =
763 [[GRPCChannelPool sharedInstance] channelWithHost:_host callOptions:_callOptions];
764 _wrappedCall = [channel wrappedCallWithPath:_path
765 completionQueue:[GRPCCompletionQueue completionQueue]
766 callOptions:_callOptions];
768 if (_wrappedCall == nil) {
769 [self finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
770 code:GRPCErrorCodeUnavailable
772 NSLocalizedDescriptionKey :
773 @"Failed to create call or channel."
781 // Connectivity monitor is not required for CFStream
782 char *enableCFStream = getenv(kCFStreamVarName);
783 if (enableCFStream == nil || enableCFStream[0] != '1') {
784 [GRPCConnectivityMonitor registerObserver:self selector:@selector(connectivityChanged:)];
788 // Now that the RPC has been initiated, request writes can start.
789 [_requestWriter startWithWriteable:self];
792 - (void)startWithWriteable:(id<GRXWriteable>)writeable {
793 id<GRPCAuthorizationProtocol> tokenProvider = nil;
794 @synchronized(self) {
795 _state = GRXWriterStateStarted;
797 // Create a retain cycle so that this instance lives until the RPC finishes (or is cancelled).
798 // This makes RPCs in which the call isn't externally retained possible (as long as it is
799 // started before being autoreleased). Care is taken not to retain self strongly in any of the
800 // blocks used in this implementation, so that the life of the instance is determined by this
804 if (_callOptions == nil) {
805 GRPCMutableCallOptions *callOptions = [[GRPCHost callOptionsForHost:_host] mutableCopy];
806 if (_serverName.length != 0) {
807 callOptions.serverAuthority = _serverName;
810 callOptions.timeout = _timeout;
812 uint32_t callFlags = [GRPCCall callFlagsForHost:_host path:_path];
813 if (callFlags != 0) {
814 if (callFlags == GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST) {
815 _callSafety = GRPCCallSafetyIdempotentRequest;
816 } else if (callFlags == GRPC_INITIAL_METADATA_CACHEABLE_REQUEST) {
817 _callSafety = GRPCCallSafetyCacheableRequest;
821 id<GRPCAuthorizationProtocol> tokenProvider = self.tokenProvider;
822 if (tokenProvider != nil) {
823 callOptions.authTokenProvider = tokenProvider;
825 _callOptions = callOptions;
828 NSAssert(_callOptions.authTokenProvider == nil || _callOptions.oauth2AccessToken == nil,
829 @"authTokenProvider and oauth2AccessToken cannot be set at the same time");
831 tokenProvider = _callOptions.authTokenProvider;
834 if (tokenProvider != nil) {
835 __weak typeof(self) weakSelf = self;
836 [tokenProvider getTokenWithHandler:^(NSString *token) {
837 __strong typeof(self) strongSelf = weakSelf;
840 @synchronized(strongSelf) {
841 if (strongSelf->_state != GRXWriterStateFinished) {
844 strongSelf->_fetchedOauth2AccessToken = [token copy];
849 [strongSelf startCallWithWriteable:writeable];
854 [self startCallWithWriteable:writeable];
858 - (void)setState:(GRXWriterState)newState {
859 @synchronized(self) {
860 // Manual transitions are only allowed from the started or paused states.
861 if (_state == GRXWriterStateNotStarted || _state == GRXWriterStateFinished) {
866 case GRXWriterStateFinished:
868 // Per GRXWriter's contract, setting the state to Finished manually
869 // means one doesn't wish the writeable to be messaged anymore.
870 [_responseWriteable cancelSilently];
871 _responseWriteable = nil;
873 case GRXWriterStatePaused:
876 case GRXWriterStateStarted:
877 if (_state == GRXWriterStatePaused) {
879 [self maybeStartNextRead];
882 case GRXWriterStateNotStarted:
888 - (void)connectivityChanged:(NSNotification *)note {
889 // Cancel underlying call upon this notification.
891 // Retain because connectivity manager only keeps weak reference to GRPCCall.
892 __strong GRPCCall *strongSelf = self;
894 @synchronized(strongSelf) {
895 [_wrappedCall cancel];
897 finishWithError:[NSError errorWithDomain:kGRPCErrorDomain
898 code:GRPCErrorCodeUnavailable
900 NSLocalizedDescriptionKey : @"Connectivity lost."
903 strongSelf->_requestWriter.state = GRXWriterStateFinished;