3 * Copyright 2019 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 #import "GRPCCallInternal.h"
21 #import <GRPCClient/GRPCCall.h>
22 #import <RxLibrary/GRXBufferedPipe.h>
24 #import "GRPCCall+V2API.h"
26 @implementation GRPCCall2Internal {
27 /** Request for the call. */
28 GRPCRequestOptions *_requestOptions;
29 /** Options for the call. */
30 GRPCCallOptions *_callOptions;
31 /** The handler of responses. */
32 id<GRPCResponseHandler> _handler;
35 * Make use of legacy GRPCCall to make calls. Nullified when call is finished.
38 /** Flags whether initial metadata has been published to response handler. */
39 BOOL _initialMetadataPublished;
40 /** Streaming call writeable to the underlying call. */
41 GRXBufferedPipe *_pipe;
42 /** Serial dispatch queue for tasks inside the call. */
43 dispatch_queue_t _dispatchQueue;
44 /** Flags whether call has started. */
46 /** Flags whether call has been canceled. */
48 /** Flags whether call has been finished. */
50 /** The number of pending messages receiving requests. */
51 NSUInteger _pendingReceiveNextMessages;
54 - (instancetype)init {
55 if ((self = [super init])) {
56 // Set queue QoS only when iOS version is 8.0 or above and Xcode version is 9.0 or above
57 #if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300
58 if (@available(iOS 8.0, macOS 10.10, *)) {
59 _dispatchQueue = dispatch_queue_create(
61 dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0));
66 _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
68 _pipe = [GRXBufferedPipe pipe];
73 - (void)setResponseHandler:(id<GRPCResponseHandler>)responseHandler {
75 NSAssert(!_started, @"Call already started.");
79 _handler = responseHandler;
80 _initialMetadataPublished = NO;
87 - (dispatch_queue_t)requestDispatchQueue {
88 return _dispatchQueue;
91 - (void)startWithRequestOptions:(GRPCRequestOptions *)requestOptions
92 callOptions:(GRPCCallOptions *)callOptions {
93 NSAssert(requestOptions.host.length != 0 && requestOptions.path.length != 0,
94 @"Neither host nor path can be nil.");
95 NSAssert(requestOptions.safety <= GRPCCallSafetyCacheableRequest, @"Invalid call safety value.");
96 if (requestOptions.host.length == 0 || requestOptions.path.length == 0) {
97 NSLog(@"Invalid host and path.");
100 if (requestOptions.safety > GRPCCallSafetyCacheableRequest) {
101 NSLog(@"Invalid call safety.");
105 @synchronized(self) {
106 NSAssert(_handler != nil, @"Response handler required.");
107 if (_handler == nil) {
108 NSLog(@"Invalid response handler.");
111 _requestOptions = requestOptions;
112 if (callOptions == nil) {
113 _callOptions = [[GRPCCallOptions alloc] init];
115 _callOptions = [callOptions copy];
123 GRPCCall *copiedCall = nil;
124 @synchronized(self) {
125 NSAssert(!_started, @"Call already started.");
126 NSAssert(!_canceled, @"Call already canceled.");
136 _call = [[GRPCCall alloc] initWithHost:_requestOptions.host
137 path:_requestOptions.path
138 callSafety:_requestOptions.safety
140 callOptions:_callOptions
142 @synchronized(self) {
143 if (self->_handler) {
144 [self issueDidWriteData];
148 [_call setResponseDispatchQueue:_dispatchQueue];
149 if (_callOptions.initialMetadata) {
150 [_call.requestHeaders addEntriesFromDictionary:_callOptions.initialMetadata];
152 if (_pendingReceiveNextMessages > 0) {
153 [_call receiveNextMessages:_pendingReceiveNextMessages];
154 _pendingReceiveNextMessages = 0;
159 void (^valueHandler)(id value) = ^(id value) {
160 @synchronized(self) {
161 if (self->_handler) {
162 if (!self->_initialMetadataPublished) {
163 self->_initialMetadataPublished = YES;
164 [self issueInitialMetadata:self->_call.responseHeaders];
167 [self issueMessage:value];
172 void (^completionHandler)(NSError *errorOrNil) = ^(NSError *errorOrNil) {
173 @synchronized(self) {
174 if (self->_handler) {
175 if (!self->_initialMetadataPublished) {
176 self->_initialMetadataPublished = YES;
177 [self issueInitialMetadata:self->_call.responseHeaders];
179 [self issueClosedWithTrailingMetadata:self->_call.responseTrailers error:errorOrNil];
181 // Clearing _call must happen *after* dispatching close in order to get trailing
182 // metadata from _call.
184 // Clean up the request writers. This should have no effect to _call since its
185 // response writeable is already nullified.
186 [self->_pipe writesFinishedWithError:nil];
192 id<GRXWriteable> responseWriteable =
193 [[GRXWriteable alloc] initWithValueHandler:valueHandler completionHandler:completionHandler];
194 [copiedCall startWithWriteable:responseWriteable];
198 GRPCCall *copiedCall = nil;
199 @synchronized(self) {
210 if ([_handler respondsToSelector:@selector(didCloseWithTrailingMetadata:error:)]) {
211 id<GRPCResponseHandler> copiedHandler = _handler;
213 dispatch_async(copiedHandler.dispatchQueue, ^{
214 [copiedHandler didCloseWithTrailingMetadata:nil
215 error:[NSError errorWithDomain:kGRPCErrorDomain
216 code:GRPCErrorCodeCancelled
218 NSLocalizedDescriptionKey :
229 - (void)writeData:(id)data {
230 GRXBufferedPipe *copiedPipe = nil;
231 @synchronized(self) {
232 NSAssert(!_canceled, @"Call already canceled.");
233 NSAssert(!_finished, @"Call is half-closed before sending data.");
245 [copiedPipe writeValue:data];
249 GRXBufferedPipe *copiedPipe = nil;
250 @synchronized(self) {
251 NSAssert(_started, @"Call not started.");
252 NSAssert(!_canceled, @"Call already canceled.");
253 NSAssert(!_finished, @"Call already half-closed.");
270 [copiedPipe writesFinishedWithError:nil];
273 - (void)issueInitialMetadata:(NSDictionary *)initialMetadata {
274 @synchronized(self) {
275 if (initialMetadata != nil &&
276 [_handler respondsToSelector:@selector(didReceiveInitialMetadata:)]) {
277 id<GRPCResponseHandler> copiedHandler = _handler;
278 dispatch_async(_handler.dispatchQueue, ^{
279 [copiedHandler didReceiveInitialMetadata:initialMetadata];
285 - (void)issueMessage:(id)message {
286 @synchronized(self) {
287 if (message != nil) {
288 if ([_handler respondsToSelector:@selector(didReceiveData:)]) {
289 id<GRPCResponseHandler> copiedHandler = _handler;
290 dispatch_async(_handler.dispatchQueue, ^{
291 [copiedHandler didReceiveData:message];
293 } else if ([_handler respondsToSelector:@selector(didReceiveRawMessage:)]) {
294 id<GRPCResponseHandler> copiedHandler = _handler;
295 dispatch_async(_handler.dispatchQueue, ^{
296 [copiedHandler didReceiveRawMessage:message];
303 - (void)issueClosedWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error {
304 @synchronized(self) {
305 if ([_handler respondsToSelector:@selector(didCloseWithTrailingMetadata:error:)]) {
306 id<GRPCResponseHandler> copiedHandler = _handler;
307 // Clean up _handler so that no more responses are reported to the handler.
309 dispatch_async(copiedHandler.dispatchQueue, ^{
310 [copiedHandler didCloseWithTrailingMetadata:trailingMetadata error:error];
318 - (void)issueDidWriteData {
319 @synchronized(self) {
320 if (_callOptions.flowControlEnabled && [_handler respondsToSelector:@selector(didWriteData)]) {
321 id<GRPCResponseHandler> copiedHandler = _handler;
322 dispatch_async(copiedHandler.dispatchQueue, ^{
323 [copiedHandler didWriteData];
329 - (void)receiveNextMessages:(NSUInteger)numberOfMessages {
330 // branching based on _callOptions.flowControlEnabled is handled inside _call
331 GRPCCall *copiedCall = nil;
332 @synchronized(self) {
334 if (copiedCall == nil) {
335 _pendingReceiveNextMessages += numberOfMessages;
339 [copiedCall receiveNextMessages:numberOfMessages];