Imported Upstream version 1.22.0
[platform/upstream/grpc.git] / src / objective-c / GRPCClient / private / GRPCCallInternal.m
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18
19 #import "GRPCCallInternal.h"
20
21 #import <GRPCClient/GRPCCall.h>
22 #import <RxLibrary/GRXBufferedPipe.h>
23
24 #import "GRPCCall+V2API.h"
25
26 @implementation GRPCCall2Internal {
27   /** Request for the call. */
28   GRPCRequestOptions *_requestOptions;
29   /** Options for the call. */
30   GRPCCallOptions *_callOptions;
31   /** The handler of responses. */
32   id<GRPCResponseHandler> _handler;
33
34   /**
35    * Make use of legacy GRPCCall to make calls. Nullified when call is finished.
36    */
37   GRPCCall *_call;
38   /** Flags whether initial metadata has been published to response handler. */
39   BOOL _initialMetadataPublished;
40   /** Streaming call writeable to the underlying call. */
41   GRXBufferedPipe *_pipe;
42   /** Serial dispatch queue for tasks inside the call. */
43   dispatch_queue_t _dispatchQueue;
44   /** Flags whether call has started. */
45   BOOL _started;
46   /** Flags whether call has been canceled. */
47   BOOL _canceled;
48   /** Flags whether call has been finished. */
49   BOOL _finished;
50   /** The number of pending messages receiving requests. */
51   NSUInteger _pendingReceiveNextMessages;
52 }
53
54 - (instancetype)init {
55   if ((self = [super init])) {
56   // Set queue QoS only when iOS version is 8.0 or above and Xcode version is 9.0 or above
57 #if __IPHONE_OS_VERSION_MAX_ALLOWED >= 110000 || __MAC_OS_X_VERSION_MAX_ALLOWED >= 101300
58     if (@available(iOS 8.0, macOS 10.10, *)) {
59       _dispatchQueue = dispatch_queue_create(
60           NULL,
61           dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_DEFAULT, 0));
62     } else {
63 #else
64     {
65 #endif
66       _dispatchQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
67     }
68     _pipe = [GRXBufferedPipe pipe];
69   }
70   return self;
71 }
72
73 - (void)setResponseHandler:(id<GRPCResponseHandler>)responseHandler {
74   @synchronized(self) {
75     NSAssert(!_started, @"Call already started.");
76     if (_started) {
77       return;
78     }
79     _handler = responseHandler;
80     _initialMetadataPublished = NO;
81     _started = NO;
82     _canceled = NO;
83     _finished = NO;
84   }
85 }
86
87 - (dispatch_queue_t)requestDispatchQueue {
88   return _dispatchQueue;
89 }
90
91 - (void)startWithRequestOptions:(GRPCRequestOptions *)requestOptions
92                     callOptions:(GRPCCallOptions *)callOptions {
93   NSAssert(requestOptions.host.length != 0 && requestOptions.path.length != 0,
94            @"Neither host nor path can be nil.");
95   NSAssert(requestOptions.safety <= GRPCCallSafetyCacheableRequest, @"Invalid call safety value.");
96   if (requestOptions.host.length == 0 || requestOptions.path.length == 0) {
97     NSLog(@"Invalid host and path.");
98     return;
99   }
100   if (requestOptions.safety > GRPCCallSafetyCacheableRequest) {
101     NSLog(@"Invalid call safety.");
102     return;
103   }
104
105   @synchronized(self) {
106     NSAssert(_handler != nil, @"Response handler required.");
107     if (_handler == nil) {
108       NSLog(@"Invalid response handler.");
109       return;
110     }
111     _requestOptions = requestOptions;
112     if (callOptions == nil) {
113       _callOptions = [[GRPCCallOptions alloc] init];
114     } else {
115       _callOptions = [callOptions copy];
116     }
117   }
118
119   [self start];
120 }
121
122 - (void)start {
123   GRPCCall *copiedCall = nil;
124   @synchronized(self) {
125     NSAssert(!_started, @"Call already started.");
126     NSAssert(!_canceled, @"Call already canceled.");
127     if (_started) {
128       return;
129     }
130     if (_canceled) {
131       return;
132     }
133
134     _started = YES;
135
136     _call = [[GRPCCall alloc] initWithHost:_requestOptions.host
137                                       path:_requestOptions.path
138                                 callSafety:_requestOptions.safety
139                             requestsWriter:_pipe
140                                callOptions:_callOptions
141                                  writeDone:^{
142                                    @synchronized(self) {
143                                      if (self->_handler) {
144                                        [self issueDidWriteData];
145                                      }
146                                    }
147                                  }];
148     [_call setResponseDispatchQueue:_dispatchQueue];
149     if (_callOptions.initialMetadata) {
150       [_call.requestHeaders addEntriesFromDictionary:_callOptions.initialMetadata];
151     }
152     if (_pendingReceiveNextMessages > 0) {
153       [_call receiveNextMessages:_pendingReceiveNextMessages];
154       _pendingReceiveNextMessages = 0;
155     }
156     copiedCall = _call;
157   }
158
159   void (^valueHandler)(id value) = ^(id value) {
160     @synchronized(self) {
161       if (self->_handler) {
162         if (!self->_initialMetadataPublished) {
163           self->_initialMetadataPublished = YES;
164           [self issueInitialMetadata:self->_call.responseHeaders];
165         }
166         if (value) {
167           [self issueMessage:value];
168         }
169       }
170     }
171   };
172   void (^completionHandler)(NSError *errorOrNil) = ^(NSError *errorOrNil) {
173     @synchronized(self) {
174       if (self->_handler) {
175         if (!self->_initialMetadataPublished) {
176           self->_initialMetadataPublished = YES;
177           [self issueInitialMetadata:self->_call.responseHeaders];
178         }
179         [self issueClosedWithTrailingMetadata:self->_call.responseTrailers error:errorOrNil];
180       }
181       // Clearing _call must happen *after* dispatching close in order to get trailing
182       // metadata from _call.
183       if (self->_call) {
184         // Clean up the request writers. This should have no effect to _call since its
185         // response writeable is already nullified.
186         [self->_pipe writesFinishedWithError:nil];
187         self->_call = nil;
188         self->_pipe = nil;
189       }
190     }
191   };
192   id<GRXWriteable> responseWriteable =
193       [[GRXWriteable alloc] initWithValueHandler:valueHandler completionHandler:completionHandler];
194   [copiedCall startWithWriteable:responseWriteable];
195 }
196
197 - (void)cancel {
198   GRPCCall *copiedCall = nil;
199   @synchronized(self) {
200     if (_canceled) {
201       return;
202     }
203
204     _canceled = YES;
205
206     copiedCall = _call;
207     _call = nil;
208     _pipe = nil;
209
210     if ([_handler respondsToSelector:@selector(didCloseWithTrailingMetadata:error:)]) {
211       id<GRPCResponseHandler> copiedHandler = _handler;
212       _handler = nil;
213       dispatch_async(copiedHandler.dispatchQueue, ^{
214         [copiedHandler didCloseWithTrailingMetadata:nil
215                                               error:[NSError errorWithDomain:kGRPCErrorDomain
216                                                                         code:GRPCErrorCodeCancelled
217                                                                     userInfo:@{
218                                                                       NSLocalizedDescriptionKey :
219                                                                           @"Canceled by app"
220                                                                     }]];
221       });
222     } else {
223       _handler = nil;
224     }
225   }
226   [copiedCall cancel];
227 }
228
229 - (void)writeData:(id)data {
230   GRXBufferedPipe *copiedPipe = nil;
231   @synchronized(self) {
232     NSAssert(!_canceled, @"Call already canceled.");
233     NSAssert(!_finished, @"Call is half-closed before sending data.");
234     if (_canceled) {
235       return;
236     }
237     if (_finished) {
238       return;
239     }
240
241     if (_pipe) {
242       copiedPipe = _pipe;
243     }
244   }
245   [copiedPipe writeValue:data];
246 }
247
248 - (void)finish {
249   GRXBufferedPipe *copiedPipe = nil;
250   @synchronized(self) {
251     NSAssert(_started, @"Call not started.");
252     NSAssert(!_canceled, @"Call already canceled.");
253     NSAssert(!_finished, @"Call already half-closed.");
254     if (!_started) {
255       return;
256     }
257     if (_canceled) {
258       return;
259     }
260     if (_finished) {
261       return;
262     }
263
264     if (_pipe) {
265       copiedPipe = _pipe;
266       _pipe = nil;
267     }
268     _finished = YES;
269   }
270   [copiedPipe writesFinishedWithError:nil];
271 }
272
273 - (void)issueInitialMetadata:(NSDictionary *)initialMetadata {
274   @synchronized(self) {
275     if (initialMetadata != nil &&
276         [_handler respondsToSelector:@selector(didReceiveInitialMetadata:)]) {
277       id<GRPCResponseHandler> copiedHandler = _handler;
278       dispatch_async(_handler.dispatchQueue, ^{
279         [copiedHandler didReceiveInitialMetadata:initialMetadata];
280       });
281     }
282   }
283 }
284
285 - (void)issueMessage:(id)message {
286   @synchronized(self) {
287     if (message != nil) {
288       if ([_handler respondsToSelector:@selector(didReceiveData:)]) {
289         id<GRPCResponseHandler> copiedHandler = _handler;
290         dispatch_async(_handler.dispatchQueue, ^{
291           [copiedHandler didReceiveData:message];
292         });
293       } else if ([_handler respondsToSelector:@selector(didReceiveRawMessage:)]) {
294         id<GRPCResponseHandler> copiedHandler = _handler;
295         dispatch_async(_handler.dispatchQueue, ^{
296           [copiedHandler didReceiveRawMessage:message];
297         });
298       }
299     }
300   }
301 }
302
303 - (void)issueClosedWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error {
304   @synchronized(self) {
305     if ([_handler respondsToSelector:@selector(didCloseWithTrailingMetadata:error:)]) {
306       id<GRPCResponseHandler> copiedHandler = _handler;
307       // Clean up _handler so that no more responses are reported to the handler.
308       _handler = nil;
309       dispatch_async(copiedHandler.dispatchQueue, ^{
310         [copiedHandler didCloseWithTrailingMetadata:trailingMetadata error:error];
311       });
312     } else {
313       _handler = nil;
314     }
315   }
316 }
317
318 - (void)issueDidWriteData {
319   @synchronized(self) {
320     if (_callOptions.flowControlEnabled && [_handler respondsToSelector:@selector(didWriteData)]) {
321       id<GRPCResponseHandler> copiedHandler = _handler;
322       dispatch_async(copiedHandler.dispatchQueue, ^{
323         [copiedHandler didWriteData];
324       });
325     }
326   }
327 }
328
329 - (void)receiveNextMessages:(NSUInteger)numberOfMessages {
330   // branching based on _callOptions.flowControlEnabled is handled inside _call
331   GRPCCall *copiedCall = nil;
332   @synchronized(self) {
333     copiedCall = _call;
334     if (copiedCall == nil) {
335       _pendingReceiveNextMessages += numberOfMessages;
336       return;
337     }
338   }
339   [copiedCall receiveNextMessages:numberOfMessages];
340 }
341
342 @end