Imported Upstream version 1.27.0
[platform/upstream/grpc.git] / src / csharp / Grpc.HealthCheck / HealthServiceImpl.cs
1 #region Copyright notice and license
2 // Copyright 2015 gRPC authors.
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //     http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 #endregion
16
17 using System;
18 using System.Collections.Generic;
19 using System.Linq;
20 #if GRPC_SUPPORT_WATCH
21 using System.Threading.Channels;
22 #endif
23 using System.Threading.Tasks;
24
25 using Grpc.Core;
26 using Grpc.Health.V1;
27
28 namespace Grpc.HealthCheck
29 {
30     /// <summary>
31     /// Implementation of a simple Health service. Useful for health checking.
32     /// 
33     /// Registering service with a server:
34     /// <code>
35     /// var serviceImpl = new HealthServiceImpl();
36     /// server = new Server();
37     /// server.AddServiceDefinition(Grpc.Health.V1.Health.BindService(serviceImpl));
38     /// </code>
39     /// </summary>
40     public class HealthServiceImpl : Grpc.Health.V1.Health.HealthBase
41     {
42         // The maximum number of statuses to buffer on the server.
43         internal const int MaxStatusBufferSize = 5;
44
45         private readonly object statusLock = new object();
46         private readonly Dictionary<string, HealthCheckResponse.Types.ServingStatus> statusMap =
47             new Dictionary<string, HealthCheckResponse.Types.ServingStatus>();
48
49 #if GRPC_SUPPORT_WATCH
50         private readonly object watchersLock = new object();
51         private readonly Dictionary<string, List<ChannelWriter<HealthCheckResponse>>> watchers =
52             new Dictionary<string, List<ChannelWriter<HealthCheckResponse>>>();
53 #endif
54
55         /// <summary>
56         /// Sets the health status for given service.
57         /// </summary>
58         /// <param name="service">The service. Cannot be null.</param>
59         /// <param name="status">the health status</param>
60         public void SetStatus(string service, HealthCheckResponse.Types.ServingStatus status)
61         {
62             HealthCheckResponse.Types.ServingStatus previousStatus;
63             lock (statusLock)
64             {
65                 previousStatus = GetServiceStatus(service);
66                 statusMap[service] = status;
67             }
68
69 #if GRPC_SUPPORT_WATCH
70             if (status != previousStatus)
71             {
72                 NotifyStatus(service, status);
73             }
74 #endif
75         }
76
77         /// <summary>
78         /// Clears health status for given service.
79         /// </summary>
80         /// <param name="service">The service. Cannot be null.</param>
81         public void ClearStatus(string service)
82         {
83             HealthCheckResponse.Types.ServingStatus previousStatus;
84             lock (statusLock)
85             {
86                 previousStatus = GetServiceStatus(service);
87                 statusMap.Remove(service);
88             }
89
90 #if GRPC_SUPPORT_WATCH
91             if (previousStatus != HealthCheckResponse.Types.ServingStatus.ServiceUnknown)
92             {
93                 NotifyStatus(service, HealthCheckResponse.Types.ServingStatus.ServiceUnknown);
94             }
95 #endif
96         }
97
98         /// <summary>
99         /// Clears statuses for all services.
100         /// </summary>
101         public void ClearAll()
102         {
103             List<KeyValuePair<string, HealthCheckResponse.Types.ServingStatus>> statuses;
104             lock (statusLock)
105             {
106                 statuses = statusMap.ToList();
107                 statusMap.Clear();
108             }
109
110 #if GRPC_SUPPORT_WATCH
111             foreach (KeyValuePair<string, HealthCheckResponse.Types.ServingStatus> status in statuses)
112             {
113                 if (status.Value != HealthCheckResponse.Types.ServingStatus.ServiceUnknown)
114                 {
115                     NotifyStatus(status.Key, HealthCheckResponse.Types.ServingStatus.ServiceUnknown);
116                 }
117             }
118 #endif
119         }
120
121         /// <summary>
122         /// Performs a health status check.
123         /// </summary>
124         /// <param name="request">The check request.</param>
125         /// <param name="context">The call context.</param>
126         /// <returns>The asynchronous response.</returns>
127         public override Task<HealthCheckResponse> Check(HealthCheckRequest request, ServerCallContext context)
128         {
129             HealthCheckResponse response = GetHealthCheckResponse(request.Service, throwOnNotFound: true);
130
131             return Task.FromResult(response);
132         }
133
134 #if GRPC_SUPPORT_WATCH
135         /// <summary>
136         /// Performs a watch for the serving status of the requested service.
137         /// The server will immediately send back a message indicating the current
138         /// serving status.  It will then subsequently send a new message whenever
139         /// the service's serving status changes.
140         ///
141         /// If the requested service is unknown when the call is received, the
142         /// server will send a message setting the serving status to
143         /// SERVICE_UNKNOWN but will *not* terminate the call.  If at some
144         /// future point, the serving status of the service becomes known, the
145         /// server will send a new message with the service's serving status.
146         ///
147         /// If the call terminates with status UNIMPLEMENTED, then clients
148         /// should assume this method is not supported and should not retry the
149         /// call.  If the call terminates with any other status (including OK),
150         /// clients should retry the call with appropriate exponential backoff.
151         /// </summary>
152         /// <param name="request">The request received from the client.</param>
153         /// <param name="responseStream">Used for sending responses back to the client.</param>
154         /// <param name="context">The context of the server-side call handler being invoked.</param>
155         /// <returns>A task indicating completion of the handler.</returns>
156         public override async Task Watch(HealthCheckRequest request, IServerStreamWriter<HealthCheckResponse> responseStream, ServerCallContext context)
157         {
158             string service = request.Service;
159
160             // Channel is used to to marshall multiple callers updating status into a single queue.
161             // This is required because IServerStreamWriter is not thread safe.
162             //
163             // A queue of unwritten statuses could build up if flow control causes responseStream.WriteAsync to await.
164             // When this number is exceeded the server will discard older statuses. The discarded intermediate statues
165             // will never be sent to the client.
166             Channel<HealthCheckResponse> channel = Channel.CreateBounded<HealthCheckResponse>(new BoundedChannelOptions(capacity: MaxStatusBufferSize) {
167                 SingleReader = true,
168                 SingleWriter = false,
169                 FullMode = BoundedChannelFullMode.DropOldest
170             });
171
172             lock (watchersLock)
173             {
174                 if (!watchers.TryGetValue(service, out List<ChannelWriter<HealthCheckResponse>> channelWriters))
175                 {
176                     channelWriters = new List<ChannelWriter<HealthCheckResponse>>();
177                     watchers.Add(service, channelWriters);
178                 }
179
180                 channelWriters.Add(channel.Writer);
181             }
182
183             // Watch calls run until ended by the client canceling them.
184             context.CancellationToken.Register(() => {
185                 lock (watchersLock)
186                 {
187                     if (watchers.TryGetValue(service, out List<ChannelWriter<HealthCheckResponse>> channelWriters))
188                     {
189                         // Remove the writer from the watchers
190                         if (channelWriters.Remove(channel.Writer))
191                         {
192                             // Remove empty collection if service has no more response streams
193                             if (channelWriters.Count == 0)
194                             {
195                                 watchers.Remove(service);
196                             }
197                         }
198                     }
199                 }
200
201                 // Signal the writer is complete and the watch method can exit.
202                 channel.Writer.Complete();
203             });
204
205             // Send current status immediately
206             HealthCheckResponse response = GetHealthCheckResponse(service, throwOnNotFound: false);
207             await responseStream.WriteAsync(response);
208
209             // Read messages. WaitToReadAsync will wait until new messages are available.
210             // Loop will exit when the call is canceled and the writer is marked as complete.
211             while (await channel.Reader.WaitToReadAsync())
212             {
213                 if (channel.Reader.TryRead(out HealthCheckResponse item))
214                 {
215                     await responseStream.WriteAsync(item);
216                 }
217             }
218         }
219
220         private void NotifyStatus(string service, HealthCheckResponse.Types.ServingStatus status)
221         {
222             lock (watchersLock)
223             {
224                 if (watchers.TryGetValue(service, out List<ChannelWriter<HealthCheckResponse>> channelWriters))
225                 {
226                     HealthCheckResponse response = new HealthCheckResponse { Status = status };
227
228                     foreach (ChannelWriter<HealthCheckResponse> writer in channelWriters)
229                     {
230                         if (!writer.TryWrite(response))
231                         {
232                             throw new InvalidOperationException("Unable to queue health check notification.");
233                         }
234                     }
235                 }
236             }
237         }
238 #endif
239
240         private HealthCheckResponse GetHealthCheckResponse(string service, bool throwOnNotFound)
241         {
242             HealthCheckResponse response = null;
243             lock (statusLock)
244             {
245                 HealthCheckResponse.Types.ServingStatus status;
246                 if (!statusMap.TryGetValue(service, out status))
247                 {
248                     if (throwOnNotFound)
249                     {
250                         // TODO(jtattermusch): returning specific status from server handler is not supported yet.
251                         throw new RpcException(new Status(StatusCode.NotFound, ""));
252                     }
253                     else
254                     {
255                         status = HealthCheckResponse.Types.ServingStatus.ServiceUnknown;
256                     }
257                 }
258                 response = new HealthCheckResponse { Status = status };
259             }
260
261             return response;
262         }
263
264         private HealthCheckResponse.Types.ServingStatus GetServiceStatus(string service)
265         {
266             if (statusMap.TryGetValue(service, out HealthCheckResponse.Types.ServingStatus s))
267             {
268                 return s;
269             }
270             else
271             {
272                 // A service with no set status has a status of ServiceUnknown
273                 return HealthCheckResponse.Types.ServingStatus.ServiceUnknown;
274             }
275         }
276     }
277 }