78c7f3ad5bb4b3622f0c86a6ddeb2df9774620eb
[platform/upstream/grpc.git] / src / csharp / Grpc.Core.Tests / Internal / AsyncCallTest.cs
1 #region Copyright notice and license
2
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16
17 #endregion
18
19 using System;
20 using System.Collections.Generic;
21 using System.Threading.Tasks;
22
23 using Grpc.Core.Internal;
24 using Grpc.Core.Utils;
25 using NUnit.Framework;
26
27 namespace Grpc.Core.Internal.Tests
28 {
29     /// <summary>
30     /// Uses fake native call to test interaction of <c>AsyncCall</c> wrapping code with C core in different situations.
31     /// </summary>
32     public class AsyncCallTest
33     {
34         Channel channel;
35         FakeNativeCall fakeCall;
36         AsyncCall<string, string> asyncCall;
37         FakeBufferReaderManager fakeBufferReaderManager;
38
39         [SetUp]
40         public void Init()
41         {
42             channel = new Channel("localhost", ChannelCredentials.Insecure);
43
44             fakeCall = new FakeNativeCall();
45
46             var callDetails = new CallInvocationDetails<string, string>(channel, "someMethod", null, Marshallers.StringMarshaller, Marshallers.StringMarshaller, new CallOptions());
47             asyncCall = new AsyncCall<string, string>(callDetails, fakeCall);
48             fakeBufferReaderManager = new FakeBufferReaderManager();
49         }
50
51         [TearDown]
52         public void Cleanup()
53         {
54             channel.ShutdownAsync().Wait();
55             fakeBufferReaderManager.Dispose();
56         }
57
58         [Test]
59         public void AsyncUnary_CanBeStartedOnlyOnce()
60         {
61             asyncCall.UnaryCallAsync("request1");
62             Assert.Throws(typeof(InvalidOperationException),
63                 () => asyncCall.UnaryCallAsync("abc"));
64         }
65
66         [Test]
67         public void AsyncUnary_StreamingOperationsNotAllowed()
68         {
69             asyncCall.UnaryCallAsync("request1");
70             Assert.ThrowsAsync(typeof(InvalidOperationException),
71                 async () => await asyncCall.ReadMessageAsync());
72             Assert.Throws(typeof(InvalidOperationException),
73                 () => asyncCall.SendMessageAsync("abc", new WriteFlags()));
74         }
75
76         [Test]
77         public void AsyncUnary_Success()
78         {
79             var resultTask = asyncCall.UnaryCallAsync("request1");
80             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
81                 new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
82                 CreateResponsePayload(),
83                 new Metadata());
84
85             AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
86         }
87
88         [Test]
89         public void AsyncUnary_NonSuccessStatusCode()
90         {
91             var resultTask = asyncCall.UnaryCallAsync("request1");
92             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
93                 CreateClientSideStatus(StatusCode.InvalidArgument),
94                 CreateNullResponse(),
95                 new Metadata());
96
97             AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument);
98         }
99
100         [Test]
101         public void AsyncUnary_NullResponsePayload()
102         {
103             var resultTask = asyncCall.UnaryCallAsync("request1");
104             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
105                 new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
106                 null,
107                 new Metadata());
108
109             // failure to deserialize will result in InvalidArgument status.
110             AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
111         }
112
113         [Test]
114         public void AsyncUnary_RequestSerializationExceptionDoesntLeakResources()
115         {
116             string nullRequest = null;  // will throw when serializing
117             Assert.Throws(typeof(ArgumentNullException), () => asyncCall.UnaryCallAsync(nullRequest));
118             Assert.AreEqual(0, channel.GetCallReferenceCount());
119             Assert.IsTrue(fakeCall.IsDisposed);
120         }
121
122         [Test]
123         public void AsyncUnary_StartCallFailureDoesntLeakResources()
124         {
125             fakeCall.MakeStartCallFail();
126             Assert.Throws(typeof(InvalidOperationException), () => asyncCall.UnaryCallAsync("request1"));
127             Assert.AreEqual(0, channel.GetCallReferenceCount());
128             Assert.IsTrue(fakeCall.IsDisposed);
129         }
130
131         [Test]
132         public void SyncUnary_RequestSerializationExceptionDoesntLeakResources()
133         {
134             string nullRequest = null;  // will throw when serializing
135             Assert.Throws(typeof(ArgumentNullException), () => asyncCall.UnaryCall(nullRequest));
136             Assert.AreEqual(0, channel.GetCallReferenceCount());
137             Assert.IsTrue(fakeCall.IsDisposed);
138         }
139
140         [Test]
141         public void SyncUnary_StartCallFailureDoesntLeakResources()
142         {
143             fakeCall.MakeStartCallFail();
144             Assert.Throws(typeof(InvalidOperationException), () => asyncCall.UnaryCall("request1"));
145             Assert.AreEqual(0, channel.GetCallReferenceCount());
146             Assert.IsTrue(fakeCall.IsDisposed);
147         }
148
149         [Test]
150         public void ClientStreaming_StreamingReadNotAllowed()
151         {
152             asyncCall.ClientStreamingCallAsync();
153             Assert.ThrowsAsync(typeof(InvalidOperationException),
154                 async () => await asyncCall.ReadMessageAsync());
155         }
156
157         [Test]
158         public void ClientStreaming_NoRequest_Success()
159         {
160             var resultTask = asyncCall.ClientStreamingCallAsync();
161             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
162                 new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
163                 CreateResponsePayload(),
164                 new Metadata());
165
166             AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
167         }
168
169         [Test]
170         public void ClientStreaming_NoRequest_NonSuccessStatusCode()
171         {
172             var resultTask = asyncCall.ClientStreamingCallAsync();
173             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
174                 CreateClientSideStatus(StatusCode.InvalidArgument),
175                 CreateNullResponse(),
176                 new Metadata());
177
178             AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.InvalidArgument);
179         }
180
181         [Test]
182         public void ClientStreaming_MoreRequests_Success()
183         {
184             var resultTask = asyncCall.ClientStreamingCallAsync();
185             var requestStream = new ClientRequestStream<string, string>(asyncCall);
186
187             var writeTask = requestStream.WriteAsync("request1");
188             fakeCall.SendCompletionCallback.OnSendCompletion(true);
189             writeTask.Wait();
190
191             var writeTask2 = requestStream.WriteAsync("request2");
192             fakeCall.SendCompletionCallback.OnSendCompletion(true);
193             writeTask2.Wait();
194
195             var completeTask = requestStream.CompleteAsync();
196             fakeCall.SendCompletionCallback.OnSendCompletion(true);
197             completeTask.Wait();
198
199             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
200                 new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
201                 CreateResponsePayload(),
202                 new Metadata());
203
204             AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
205         }
206
207         [Test]
208         public void ClientStreaming_WriteFailureThrowsRpcException()
209         {
210             var resultTask = asyncCall.ClientStreamingCallAsync();
211             var requestStream = new ClientRequestStream<string, string>(asyncCall);
212
213             var writeTask = requestStream.WriteAsync("request1");
214             fakeCall.SendCompletionCallback.OnSendCompletion(false);
215
216             // The write will wait for call to finish to receive the status code.
217             Assert.IsFalse(writeTask.IsCompleted);
218
219             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
220                 CreateClientSideStatus(StatusCode.Internal),
221                 CreateNullResponse(),
222                 new Metadata());
223
224             var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
225             Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
226
227             AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
228         }
229
230         [Test]
231         public void ClientStreaming_WriteFailureThrowsRpcException2()
232         {
233             var resultTask = asyncCall.ClientStreamingCallAsync();
234             var requestStream = new ClientRequestStream<string, string>(asyncCall);
235
236             var writeTask = requestStream.WriteAsync("request1");
237
238             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
239                 CreateClientSideStatus(StatusCode.Internal),
240                 CreateNullResponse(),
241                 new Metadata());
242
243             fakeCall.SendCompletionCallback.OnSendCompletion(false);
244
245             var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
246             Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
247
248             AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
249         }
250
251         [Test]
252         public void ClientStreaming_WriteFailureThrowsRpcException3()
253         {
254             var resultTask = asyncCall.ClientStreamingCallAsync();
255             var requestStream = new ClientRequestStream<string, string>(asyncCall);
256
257             var writeTask = requestStream.WriteAsync("request1");
258             fakeCall.SendCompletionCallback.OnSendCompletion(false);
259
260             // Until the delayed write completion has been triggered,
261             // we still act as if there was an active write.
262             Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request2"));
263
264             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
265                 CreateClientSideStatus(StatusCode.Internal),
266                 CreateNullResponse(),
267                 new Metadata());
268
269             var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
270             Assert.AreEqual(StatusCode.Internal, ex.Status.StatusCode);
271
272             // Following attempts to write keep delivering the same status
273             var ex2 = Assert.ThrowsAsync<RpcException>(async () => await requestStream.WriteAsync("after call has finished"));
274             Assert.AreEqual(StatusCode.Internal, ex2.Status.StatusCode);
275
276             AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Internal);
277         }
278
279         [Test]
280         public void ClientStreaming_WriteAfterReceivingStatusThrowsRpcException()
281         {
282             var resultTask = asyncCall.ClientStreamingCallAsync();
283             var requestStream = new ClientRequestStream<string, string>(asyncCall);
284
285             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
286                 new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
287                 CreateResponsePayload(),
288                 new Metadata());
289
290             AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
291
292             var writeTask = requestStream.WriteAsync("request1");
293             var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
294             Assert.AreEqual(Status.DefaultSuccess, ex.Status);
295         }
296
297         [Test]
298         public void ClientStreaming_WriteAfterReceivingStatusThrowsRpcException2()
299         {
300             var resultTask = asyncCall.ClientStreamingCallAsync();
301             var requestStream = new ClientRequestStream<string, string>(asyncCall);
302
303             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
304                 new ClientSideStatus(new Status(StatusCode.OutOfRange, ""), new Metadata()),
305                 CreateResponsePayload(),
306                 new Metadata());
307
308             AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.OutOfRange);
309
310             var writeTask = requestStream.WriteAsync("request1");
311             var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
312             Assert.AreEqual(StatusCode.OutOfRange, ex.Status.StatusCode);
313         }
314
315         [Test]
316         public void ClientStreaming_WriteAfterCompleteThrowsInvalidOperationException()
317         {
318             var resultTask = asyncCall.ClientStreamingCallAsync();
319             var requestStream = new ClientRequestStream<string, string>(asyncCall);
320
321             requestStream.CompleteAsync();
322
323             Assert.Throws(typeof(InvalidOperationException), () => requestStream.WriteAsync("request1"));
324
325             fakeCall.SendCompletionCallback.OnSendCompletion(true);
326
327             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
328                 new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
329                 CreateResponsePayload(),
330                 new Metadata());
331
332             AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
333         }
334
335         [Test]
336         public void ClientStreaming_CompleteAfterReceivingStatusSucceeds()
337         {
338             var resultTask = asyncCall.ClientStreamingCallAsync();
339             var requestStream = new ClientRequestStream<string, string>(asyncCall);
340
341             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
342                 new ClientSideStatus(Status.DefaultSuccess, new Metadata()),
343                 CreateResponsePayload(),
344                 new Metadata());
345
346             AssertUnaryResponseSuccess(asyncCall, fakeCall, resultTask);
347             Assert.DoesNotThrowAsync(async () => await requestStream.CompleteAsync());
348         }
349
350         [Test]
351         public void ClientStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException()
352         {
353             var resultTask = asyncCall.ClientStreamingCallAsync();
354             var requestStream = new ClientRequestStream<string, string>(asyncCall);
355
356             asyncCall.Cancel();
357             Assert.IsTrue(fakeCall.IsCancelled);
358
359             var writeTask = requestStream.WriteAsync("request1");
360             Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask);
361
362             fakeCall.UnaryResponseClientCallback.OnUnaryResponseClient(true,
363                 CreateClientSideStatus(StatusCode.Cancelled),
364                 CreateNullResponse(),
365                 new Metadata());
366
367             AssertUnaryResponseError(asyncCall, fakeCall, resultTask, StatusCode.Cancelled);
368         }
369
370         [Test]
371         public void ClientStreaming_StartCallFailureDoesntLeakResources()
372         {
373             fakeCall.MakeStartCallFail();
374             Assert.Throws(typeof(InvalidOperationException), () => asyncCall.ClientStreamingCallAsync());
375             Assert.AreEqual(0, channel.GetCallReferenceCount());
376             Assert.IsTrue(fakeCall.IsDisposed);
377         }
378
379         [Test]
380         public void ServerStreaming_StreamingSendNotAllowed()
381         {
382             asyncCall.StartServerStreamingCall("request1");
383             Assert.Throws(typeof(InvalidOperationException),
384                 () => asyncCall.SendMessageAsync("abc", new WriteFlags()));
385         }
386
387         [Test]
388         public void ServerStreaming_NoResponse_Success1()
389         {
390             asyncCall.StartServerStreamingCall("request1");
391             var responseStream = new ClientResponseStream<string, string>(asyncCall);
392             var readTask = responseStream.MoveNext();
393
394             fakeCall.ReceivedResponseHeadersCallback.OnReceivedResponseHeaders(true, new Metadata());
395             Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
396
397             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
398             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
399
400             AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
401         }
402
403         [Test]
404         public void ServerStreaming_NoResponse_Success2()
405         {
406             asyncCall.StartServerStreamingCall("request1");
407             var responseStream = new ClientResponseStream<string, string>(asyncCall);
408             var readTask = responseStream.MoveNext();
409
410             // try alternative order of completions
411             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
412             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
413
414             AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
415         }
416
417         [Test]
418         public void ServerStreaming_NoResponse_ReadFailure()
419         {
420             asyncCall.StartServerStreamingCall("request1");
421             var responseStream = new ClientResponseStream<string, string>(asyncCall);
422             var readTask = responseStream.MoveNext();
423
424             fakeCall.ReceivedMessageCallback.OnReceivedMessage(false, CreateNullResponse());  // after a failed read, we rely on C core to deliver appropriate status code.
425             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Internal));
426
427             AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Internal);
428         }
429
430         [Test]
431         public void ServerStreaming_MoreResponses_Success()
432         {
433             asyncCall.StartServerStreamingCall("request1");
434             var responseStream = new ClientResponseStream<string, string>(asyncCall);
435
436             var readTask1 = responseStream.MoveNext();
437             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload());
438             Assert.IsTrue(readTask1.Result);
439             Assert.AreEqual("response1", responseStream.Current);
440
441             var readTask2 = responseStream.MoveNext();
442             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload());
443             Assert.IsTrue(readTask2.Result);
444             Assert.AreEqual("response1", responseStream.Current);
445
446             var readTask3 = responseStream.MoveNext();
447             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
448             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
449
450             AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask3);
451         }
452
453         [Test]
454         public void ServerStreaming_RequestSerializationExceptionDoesntLeakResources()
455         {
456             string nullRequest = null;  // will throw when serializing
457             Assert.Throws(typeof(ArgumentNullException), () => asyncCall.StartServerStreamingCall(nullRequest));
458             Assert.AreEqual(0, channel.GetCallReferenceCount());
459             Assert.IsTrue(fakeCall.IsDisposed);
460
461             var responseStream = new ClientResponseStream<string, string>(asyncCall);
462             var readTask = responseStream.MoveNext();
463         }
464
465         [Test]
466         public void ServerStreaming_StartCallFailureDoesntLeakResources()
467         {
468             fakeCall.MakeStartCallFail();
469             Assert.Throws(typeof(InvalidOperationException), () => asyncCall.StartServerStreamingCall("request1"));
470             Assert.AreEqual(0, channel.GetCallReferenceCount());
471             Assert.IsTrue(fakeCall.IsDisposed);
472         }
473
474         [Test]
475         public void DuplexStreaming_NoRequestNoResponse_Success()
476         {
477             asyncCall.StartDuplexStreamingCall();
478             var requestStream = new ClientRequestStream<string, string>(asyncCall);
479             var responseStream = new ClientResponseStream<string, string>(asyncCall);
480
481             var writeTask1 = requestStream.CompleteAsync();
482             fakeCall.SendCompletionCallback.OnSendCompletion(true);
483             Assert.DoesNotThrowAsync(async () => await writeTask1);
484
485             var readTask = responseStream.MoveNext();
486             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
487             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
488
489             AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
490         }
491
492         [Test]
493         public void DuplexStreaming_WriteAfterReceivingStatusThrowsRpcException()
494         {
495             asyncCall.StartDuplexStreamingCall();
496             var requestStream = new ClientRequestStream<string, string>(asyncCall);
497             var responseStream = new ClientResponseStream<string, string>(asyncCall);
498
499             var readTask = responseStream.MoveNext();
500             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
501             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
502
503             AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
504
505             var writeTask = requestStream.WriteAsync("request1");
506             var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
507             Assert.AreEqual(Status.DefaultSuccess, ex.Status);
508         }
509
510         [Test]
511         public void DuplexStreaming_CompleteAfterReceivingStatusSuceeds()
512         {
513             asyncCall.StartDuplexStreamingCall();
514             var requestStream = new ClientRequestStream<string, string>(asyncCall);
515             var responseStream = new ClientResponseStream<string, string>(asyncCall);
516
517             var readTask = responseStream.MoveNext();
518             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
519             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, new ClientSideStatus(Status.DefaultSuccess, new Metadata()));
520
521             AssertStreamingResponseSuccess(asyncCall, fakeCall, readTask);
522
523             Assert.DoesNotThrowAsync(async () => await requestStream.CompleteAsync());
524         }
525
526         [Test]
527         public void DuplexStreaming_WriteFailureThrowsRpcException()
528         {
529             asyncCall.StartDuplexStreamingCall();
530             var requestStream = new ClientRequestStream<string, string>(asyncCall);
531             var responseStream = new ClientResponseStream<string, string>(asyncCall);
532
533             var writeTask = requestStream.WriteAsync("request1");
534             fakeCall.SendCompletionCallback.OnSendCompletion(false);
535
536             // The write will wait for call to finish to receive the status code.
537             Assert.IsFalse(writeTask.IsCompleted);
538
539             var readTask = responseStream.MoveNext();
540             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
541             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.PermissionDenied));
542
543             var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
544             Assert.AreEqual(StatusCode.PermissionDenied, ex.Status.StatusCode);
545
546             AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.PermissionDenied);
547         }
548
549         [Test]
550         public void DuplexStreaming_WriteFailureThrowsRpcException2()
551         {
552             asyncCall.StartDuplexStreamingCall();
553             var requestStream = new ClientRequestStream<string, string>(asyncCall);
554             var responseStream = new ClientResponseStream<string, string>(asyncCall);
555
556             var writeTask = requestStream.WriteAsync("request1");
557
558             var readTask = responseStream.MoveNext();
559             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
560             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.PermissionDenied));
561             fakeCall.SendCompletionCallback.OnSendCompletion(false);
562
563             var ex = Assert.ThrowsAsync<RpcException>(async () => await writeTask);
564             Assert.AreEqual(StatusCode.PermissionDenied, ex.Status.StatusCode);
565
566             AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.PermissionDenied);
567         }
568
569         [Test]
570         public void DuplexStreaming_WriteAfterCancellationRequestThrowsTaskCanceledException()
571         {
572             asyncCall.StartDuplexStreamingCall();
573             var requestStream = new ClientRequestStream<string, string>(asyncCall);
574             var responseStream = new ClientResponseStream<string, string>(asyncCall);
575
576             asyncCall.Cancel();
577             Assert.IsTrue(fakeCall.IsCancelled);
578
579             var writeTask = requestStream.WriteAsync("request1");
580             Assert.ThrowsAsync(typeof(TaskCanceledException), async () => await writeTask);
581
582             var readTask = responseStream.MoveNext();
583             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
584             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled));
585
586             AssertStreamingResponseError(asyncCall, fakeCall, readTask, StatusCode.Cancelled);
587         }
588
589         [Test]
590         public void DuplexStreaming_ReadAfterCancellationRequestCanSucceed()
591         {
592             asyncCall.StartDuplexStreamingCall();
593             var responseStream = new ClientResponseStream<string, string>(asyncCall);
594
595             asyncCall.Cancel();
596             Assert.IsTrue(fakeCall.IsCancelled);
597
598             var readTask1 = responseStream.MoveNext();
599             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload());
600             Assert.IsTrue(readTask1.Result);
601             Assert.AreEqual("response1", responseStream.Current);
602
603             var readTask2 = responseStream.MoveNext();
604             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
605             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled));
606
607             AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled);
608         }
609
610         [Test]
611         public void DuplexStreaming_ReadStartedBeforeCancellationRequestCanSucceed()
612         {
613             asyncCall.StartDuplexStreamingCall();
614             var responseStream = new ClientResponseStream<string, string>(asyncCall);
615
616             var readTask1 = responseStream.MoveNext();  // initiate the read before cancel request
617             asyncCall.Cancel();
618             Assert.IsTrue(fakeCall.IsCancelled);
619
620             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateResponsePayload());
621             Assert.IsTrue(readTask1.Result);
622             Assert.AreEqual("response1", responseStream.Current);
623
624             var readTask2 = responseStream.MoveNext();
625             fakeCall.ReceivedMessageCallback.OnReceivedMessage(true, CreateNullResponse());
626             fakeCall.ReceivedStatusOnClientCallback.OnReceivedStatusOnClient(true, CreateClientSideStatus(StatusCode.Cancelled));
627
628             AssertStreamingResponseError(asyncCall, fakeCall, readTask2, StatusCode.Cancelled);
629         }
630
631         [Test]
632         public void DuplexStreaming_StartCallFailureDoesntLeakResources()
633         {
634             fakeCall.MakeStartCallFail();
635             Assert.Throws(typeof(InvalidOperationException), () => asyncCall.StartDuplexStreamingCall());
636             Assert.AreEqual(0, channel.GetCallReferenceCount());
637             Assert.IsTrue(fakeCall.IsDisposed);
638         }
639
640         ClientSideStatus CreateClientSideStatus(StatusCode statusCode)
641         {
642             return new ClientSideStatus(new Status(statusCode, ""), new Metadata());
643         }
644
645         IBufferReader CreateResponsePayload()
646         {
647             return fakeBufferReaderManager.CreateSingleSegmentBufferReader(Marshallers.StringMarshaller.Serializer("response1"));
648         }
649
650         IBufferReader CreateNullResponse()
651         {
652             return fakeBufferReaderManager.CreateNullPayloadBufferReader();
653         }
654
655         static void AssertUnaryResponseSuccess(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<string> resultTask)
656         {
657             Assert.IsTrue(resultTask.IsCompleted);
658             Assert.IsTrue(fakeCall.IsDisposed);
659
660             Assert.AreEqual(Status.DefaultSuccess, asyncCall.GetStatus());
661             Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
662             Assert.AreEqual(0, asyncCall.GetTrailers().Count);
663             Assert.AreEqual("response1", resultTask.Result);
664         }
665
666         static void AssertStreamingResponseSuccess(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<bool> moveNextTask)
667         {
668             Assert.IsTrue(moveNextTask.IsCompleted);
669             Assert.IsTrue(fakeCall.IsDisposed);
670
671             Assert.IsFalse(moveNextTask.Result);
672             Assert.AreEqual(Status.DefaultSuccess, asyncCall.GetStatus());
673             Assert.AreEqual(0, asyncCall.GetTrailers().Count);
674         }
675
676         static void AssertUnaryResponseError(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<string> resultTask, StatusCode expectedStatusCode)
677         {
678             Assert.IsTrue(resultTask.IsCompleted);
679             Assert.IsTrue(fakeCall.IsDisposed);
680
681             Assert.AreEqual(expectedStatusCode, asyncCall.GetStatus().StatusCode);
682             var ex = Assert.ThrowsAsync<RpcException>(async () => await resultTask);
683             Assert.AreEqual(expectedStatusCode, ex.Status.StatusCode);
684             Assert.AreEqual(0, asyncCall.ResponseHeadersAsync.Result.Count);
685             Assert.AreEqual(0, asyncCall.GetTrailers().Count);
686         }
687
688         static void AssertStreamingResponseError(AsyncCall<string, string> asyncCall, FakeNativeCall fakeCall, Task<bool> moveNextTask, StatusCode expectedStatusCode)
689         {
690             Assert.IsTrue(moveNextTask.IsCompleted);
691             Assert.IsTrue(fakeCall.IsDisposed);
692
693             var ex = Assert.ThrowsAsync<RpcException>(async () => await moveNextTask);
694             Assert.AreEqual(expectedStatusCode, ex.Status.StatusCode);
695             Assert.AreEqual(expectedStatusCode, asyncCall.GetStatus().StatusCode);
696             Assert.AreEqual(0, asyncCall.GetTrailers().Count);
697         }
698     }
699 }