1 #region Copyright notice and license
2 // Copyright 2015 gRPC authors.
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
8 // http://www.apache.org/licenses/LICENSE-2.0
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
18 using System.Collections.Generic;
20 #if GRPC_SUPPORT_WATCH
21 using System.Threading.Channels;
23 using System.Threading.Tasks;
28 namespace Grpc.HealthCheck
31 /// Implementation of a simple Health service. Useful for health checking.
33 /// Registering service with a server:
35 /// var serviceImpl = new HealthServiceImpl();
36 /// server = new Server();
37 /// server.AddServiceDefinition(Grpc.Health.V1.Health.BindService(serviceImpl));
40 public class HealthServiceImpl : Grpc.Health.V1.Health.HealthBase
42 // The maximum number of statuses to buffer on the server.
43 internal const int MaxStatusBufferSize = 5;
45 private readonly object statusLock = new object();
46 private readonly Dictionary<string, HealthCheckResponse.Types.ServingStatus> statusMap =
47 new Dictionary<string, HealthCheckResponse.Types.ServingStatus>();
49 #if GRPC_SUPPORT_WATCH
50 private readonly object watchersLock = new object();
51 private readonly Dictionary<string, List<ChannelWriter<HealthCheckResponse>>> watchers =
52 new Dictionary<string, List<ChannelWriter<HealthCheckResponse>>>();
56 /// Sets the health status for given service.
58 /// <param name="service">The service. Cannot be null.</param>
59 /// <param name="status">the health status</param>
60 public void SetStatus(string service, HealthCheckResponse.Types.ServingStatus status)
62 HealthCheckResponse.Types.ServingStatus previousStatus;
65 previousStatus = GetServiceStatus(service);
66 statusMap[service] = status;
69 #if GRPC_SUPPORT_WATCH
70 if (status != previousStatus)
72 NotifyStatus(service, status);
78 /// Clears health status for given service.
80 /// <param name="service">The service. Cannot be null.</param>
81 public void ClearStatus(string service)
83 HealthCheckResponse.Types.ServingStatus previousStatus;
86 previousStatus = GetServiceStatus(service);
87 statusMap.Remove(service);
90 #if GRPC_SUPPORT_WATCH
91 if (previousStatus != HealthCheckResponse.Types.ServingStatus.ServiceUnknown)
93 NotifyStatus(service, HealthCheckResponse.Types.ServingStatus.ServiceUnknown);
99 /// Clears statuses for all services.
101 public void ClearAll()
103 List<KeyValuePair<string, HealthCheckResponse.Types.ServingStatus>> statuses;
106 statuses = statusMap.ToList();
110 #if GRPC_SUPPORT_WATCH
111 foreach (KeyValuePair<string, HealthCheckResponse.Types.ServingStatus> status in statuses)
113 if (status.Value != HealthCheckResponse.Types.ServingStatus.ServiceUnknown)
115 NotifyStatus(status.Key, HealthCheckResponse.Types.ServingStatus.ServiceUnknown);
122 /// Performs a health status check.
124 /// <param name="request">The check request.</param>
125 /// <param name="context">The call context.</param>
126 /// <returns>The asynchronous response.</returns>
127 public override Task<HealthCheckResponse> Check(HealthCheckRequest request, ServerCallContext context)
129 HealthCheckResponse response = GetHealthCheckResponse(request.Service, throwOnNotFound: true);
131 return Task.FromResult(response);
134 #if GRPC_SUPPORT_WATCH
136 /// Performs a watch for the serving status of the requested service.
137 /// The server will immediately send back a message indicating the current
138 /// serving status. It will then subsequently send a new message whenever
139 /// the service's serving status changes.
141 /// If the requested service is unknown when the call is received, the
142 /// server will send a message setting the serving status to
143 /// SERVICE_UNKNOWN but will *not* terminate the call. If at some
144 /// future point, the serving status of the service becomes known, the
145 /// server will send a new message with the service's serving status.
147 /// If the call terminates with status UNIMPLEMENTED, then clients
148 /// should assume this method is not supported and should not retry the
149 /// call. If the call terminates with any other status (including OK),
150 /// clients should retry the call with appropriate exponential backoff.
152 /// <param name="request">The request received from the client.</param>
153 /// <param name="responseStream">Used for sending responses back to the client.</param>
154 /// <param name="context">The context of the server-side call handler being invoked.</param>
155 /// <returns>A task indicating completion of the handler.</returns>
156 public override async Task Watch(HealthCheckRequest request, IServerStreamWriter<HealthCheckResponse> responseStream, ServerCallContext context)
158 string service = request.Service;
160 // Channel is used to to marshall multiple callers updating status into a single queue.
161 // This is required because IServerStreamWriter is not thread safe.
163 // A queue of unwritten statuses could build up if flow control causes responseStream.WriteAsync to await.
164 // When this number is exceeded the server will discard older statuses. The discarded intermediate statues
165 // will never be sent to the client.
166 Channel<HealthCheckResponse> channel = Channel.CreateBounded<HealthCheckResponse>(new BoundedChannelOptions(capacity: MaxStatusBufferSize) {
168 SingleWriter = false,
169 FullMode = BoundedChannelFullMode.DropOldest
174 if (!watchers.TryGetValue(service, out List<ChannelWriter<HealthCheckResponse>> channelWriters))
176 channelWriters = new List<ChannelWriter<HealthCheckResponse>>();
177 watchers.Add(service, channelWriters);
180 channelWriters.Add(channel.Writer);
183 // Watch calls run until ended by the client canceling them.
184 context.CancellationToken.Register(() => {
187 if (watchers.TryGetValue(service, out List<ChannelWriter<HealthCheckResponse>> channelWriters))
189 // Remove the writer from the watchers
190 if (channelWriters.Remove(channel.Writer))
192 // Remove empty collection if service has no more response streams
193 if (channelWriters.Count == 0)
195 watchers.Remove(service);
201 // Signal the writer is complete and the watch method can exit.
202 channel.Writer.Complete();
205 // Send current status immediately
206 HealthCheckResponse response = GetHealthCheckResponse(service, throwOnNotFound: false);
207 await responseStream.WriteAsync(response);
209 // Read messages. WaitToReadAsync will wait until new messages are available.
210 // Loop will exit when the call is canceled and the writer is marked as complete.
211 while (await channel.Reader.WaitToReadAsync())
213 if (channel.Reader.TryRead(out HealthCheckResponse item))
215 await responseStream.WriteAsync(item);
220 private void NotifyStatus(string service, HealthCheckResponse.Types.ServingStatus status)
224 if (watchers.TryGetValue(service, out List<ChannelWriter<HealthCheckResponse>> channelWriters))
226 HealthCheckResponse response = new HealthCheckResponse { Status = status };
228 foreach (ChannelWriter<HealthCheckResponse> writer in channelWriters)
230 if (!writer.TryWrite(response))
232 throw new InvalidOperationException("Unable to queue health check notification.");
240 private HealthCheckResponse GetHealthCheckResponse(string service, bool throwOnNotFound)
242 HealthCheckResponse response = null;
245 HealthCheckResponse.Types.ServingStatus status;
246 if (!statusMap.TryGetValue(service, out status))
250 // TODO(jtattermusch): returning specific status from server handler is not supported yet.
251 throw new RpcException(new Status(StatusCode.NotFound, ""));
255 status = HealthCheckResponse.Types.ServingStatus.ServiceUnknown;
258 response = new HealthCheckResponse { Status = status };
264 private HealthCheckResponse.Types.ServingStatus GetServiceStatus(string service)
266 if (statusMap.TryGetValue(service, out HealthCheckResponse.Types.ServingStatus s))
272 // A service with no set status has a status of ServiceUnknown
273 return HealthCheckResponse.Types.ServingStatus.ServiceUnknown;