1 # Copyright 2017 gRPC authors.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests server certificate rotation.
16 Here we test various aspects of gRPC Python, and in some cases gRPC
17 Core by extension, support for server certificate rotation.
19 * ServerSSLCertReloadTestWithClientAuth: test ability to rotate
20 server's SSL cert for use in future channels with clients while not
21 affecting any existing channel. The server requires client
24 * ServerSSLCertReloadTestWithoutClientAuth: like
25 ServerSSLCertReloadTestWithClientAuth except that the server does
26 not authenticate the client.
28 * ServerSSLCertReloadTestCertConfigReuse: tests gRPC Python's ability
29 to deal with user's reuse of ServerCertificateConfiguration instances.
40 from concurrent import futures
43 from tests.unit import resources
44 from tests.unit import test_common
45 from tests.testing import _application_common
46 from tests.testing import _server_application
47 from tests.testing.proto import services_pb2_grpc
49 CA_1_PEM = resources.cert_hier_1_root_ca_cert()
50 CA_2_PEM = resources.cert_hier_2_root_ca_cert()
52 CLIENT_KEY_1_PEM = resources.cert_hier_1_client_1_key()
53 CLIENT_CERT_CHAIN_1_PEM = (resources.cert_hier_1_client_1_cert() +
54 resources.cert_hier_1_intermediate_ca_cert())
56 CLIENT_KEY_2_PEM = resources.cert_hier_2_client_1_key()
57 CLIENT_CERT_CHAIN_2_PEM = (resources.cert_hier_2_client_1_cert() +
58 resources.cert_hier_2_intermediate_ca_cert())
60 SERVER_KEY_1_PEM = resources.cert_hier_1_server_1_key()
61 SERVER_CERT_CHAIN_1_PEM = (resources.cert_hier_1_server_1_cert() +
62 resources.cert_hier_1_intermediate_ca_cert())
64 SERVER_KEY_2_PEM = resources.cert_hier_2_server_1_key()
65 SERVER_CERT_CHAIN_2_PEM = (resources.cert_hier_2_server_1_cert() +
66 resources.cert_hier_2_intermediate_ca_cert())
68 # for use with the CertConfigFetcher. Roughly a simple custom mock
70 Call = collections.namedtuple('Call', ['did_raise', 'returned_cert_config'])
73 def _create_channel(port, credentials):
74 return grpc.secure_channel('localhost:{}'.format(port), credentials)
77 def _create_client_stub(channel, expect_success):
79 # per Nathaniel: there's some robustness issue if we start
80 # using a channel without waiting for it to be actually ready
81 grpc.channel_ready_future(channel).result(timeout=10)
82 return services_pb2_grpc.FirstServiceStub(channel)
85 class CertConfigFetcher(object):
88 self._lock = threading.Lock()
90 self._should_raise = False
91 self._cert_config = None
96 self._should_raise = False
97 self._cert_config = None
99 def configure(self, should_raise, cert_config):
100 assert not (should_raise and cert_config), (
101 "should not specify both should_raise and a cert_config at the same time"
104 self._should_raise = should_raise
105 self._cert_config = cert_config
113 if self._should_raise:
114 self._calls.append(Call(True, None))
115 raise ValueError('just for fun, should not affect the test')
117 self._calls.append(Call(False, self._cert_config))
118 return self._cert_config
121 class _ServerSSLCertReloadTest(
122 six.with_metaclass(abc.ABCMeta, unittest.TestCase)):
124 def __init__(self, *args, **kwargs):
125 super(_ServerSSLCertReloadTest, self).__init__(*args, **kwargs)
130 def require_client_auth(self):
131 raise NotImplementedError()
134 self.server = test_common.test_server()
135 services_pb2_grpc.add_FirstServiceServicer_to_server(
136 _server_application.FirstServiceServicer(), self.server)
137 switch_cert_on_client_num = 10
138 initial_cert_config = grpc.ssl_server_certificate_configuration(
139 [(SERVER_KEY_1_PEM, SERVER_CERT_CHAIN_1_PEM)],
140 root_certificates=CA_2_PEM)
141 self.cert_config_fetcher = CertConfigFetcher()
142 server_credentials = grpc.dynamic_ssl_server_credentials(
144 self.cert_config_fetcher,
145 require_client_authentication=self.require_client_auth())
146 self.port = self.server.add_secure_port('[::]:0', server_credentials)
151 self.server.stop(None)
153 def _perform_rpc(self, client_stub, expect_success):
154 # we don't care about the actual response of the rpc; only
155 # whether we can perform it or not, and if not, the status
156 # code must be UNAVAILABLE
157 request = _application_common.UNARY_UNARY_REQUEST
159 response = client_stub.UnUn(request)
160 self.assertEqual(response, _application_common.UNARY_UNARY_RESPONSE)
162 with self.assertRaises(grpc.RpcError) as exception_context:
163 client_stub.UnUn(request)
164 self.assertEqual(exception_context.exception.code(),
165 grpc.StatusCode.UNAVAILABLE)
167 def _do_one_shot_client_rpc(self,
169 root_certificates=None,
171 certificate_chain=None):
172 credentials = grpc.ssl_channel_credentials(
173 root_certificates=root_certificates,
174 private_key=private_key,
175 certificate_chain=certificate_chain)
176 with _create_channel(self.port, credentials) as client_channel:
177 client_stub = _create_client_stub(client_channel, expect_success)
178 self._perform_rpc(client_stub, expect_success)
181 # things should work...
182 self.cert_config_fetcher.configure(False, None)
183 self._do_one_shot_client_rpc(True,
184 root_certificates=CA_1_PEM,
185 private_key=CLIENT_KEY_2_PEM,
186 certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
187 actual_calls = self.cert_config_fetcher.getCalls()
188 self.assertEqual(len(actual_calls), 1)
189 self.assertFalse(actual_calls[0].did_raise)
190 self.assertIsNone(actual_calls[0].returned_cert_config)
192 # client should reject server...
193 # fails because client trusts ca2 and so will reject server
194 self.cert_config_fetcher.reset()
195 self.cert_config_fetcher.configure(False, None)
196 self._do_one_shot_client_rpc(False,
197 root_certificates=CA_2_PEM,
198 private_key=CLIENT_KEY_2_PEM,
199 certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
200 actual_calls = self.cert_config_fetcher.getCalls()
201 self.assertGreaterEqual(len(actual_calls), 1)
202 self.assertFalse(actual_calls[0].did_raise)
203 for i, call in enumerate(actual_calls):
204 self.assertFalse(call.did_raise, 'i= {}'.format(i))
205 self.assertIsNone(call.returned_cert_config, 'i= {}'.format(i))
207 # should work again...
208 self.cert_config_fetcher.reset()
209 self.cert_config_fetcher.configure(True, None)
210 self._do_one_shot_client_rpc(True,
211 root_certificates=CA_1_PEM,
212 private_key=CLIENT_KEY_2_PEM,
213 certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
214 actual_calls = self.cert_config_fetcher.getCalls()
215 self.assertEqual(len(actual_calls), 1)
216 self.assertTrue(actual_calls[0].did_raise)
217 self.assertIsNone(actual_calls[0].returned_cert_config)
219 # if with_client_auth, then client should be rejected by
220 # server because client uses key/cert1, but server trusts ca2,
221 # so server will reject
222 self.cert_config_fetcher.reset()
223 self.cert_config_fetcher.configure(False, None)
224 self._do_one_shot_client_rpc(not self.require_client_auth(),
225 root_certificates=CA_1_PEM,
226 private_key=CLIENT_KEY_1_PEM,
227 certificate_chain=CLIENT_CERT_CHAIN_1_PEM)
228 actual_calls = self.cert_config_fetcher.getCalls()
229 self.assertGreaterEqual(len(actual_calls), 1)
230 for i, call in enumerate(actual_calls):
231 self.assertFalse(call.did_raise, 'i= {}'.format(i))
232 self.assertIsNone(call.returned_cert_config, 'i= {}'.format(i))
234 # should work again...
235 self.cert_config_fetcher.reset()
236 self.cert_config_fetcher.configure(False, None)
237 self._do_one_shot_client_rpc(True,
238 root_certificates=CA_1_PEM,
239 private_key=CLIENT_KEY_2_PEM,
240 certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
241 actual_calls = self.cert_config_fetcher.getCalls()
242 self.assertEqual(len(actual_calls), 1)
243 self.assertFalse(actual_calls[0].did_raise)
244 self.assertIsNone(actual_calls[0].returned_cert_config)
246 # now create the "persistent" clients
247 self.cert_config_fetcher.reset()
248 self.cert_config_fetcher.configure(False, None)
249 channel_A = _create_channel(
251 grpc.ssl_channel_credentials(
252 root_certificates=CA_1_PEM,
253 private_key=CLIENT_KEY_2_PEM,
254 certificate_chain=CLIENT_CERT_CHAIN_2_PEM))
255 persistent_client_stub_A = _create_client_stub(channel_A, True)
256 self._perform_rpc(persistent_client_stub_A, True)
257 actual_calls = self.cert_config_fetcher.getCalls()
258 self.assertEqual(len(actual_calls), 1)
259 self.assertFalse(actual_calls[0].did_raise)
260 self.assertIsNone(actual_calls[0].returned_cert_config)
262 self.cert_config_fetcher.reset()
263 self.cert_config_fetcher.configure(False, None)
264 channel_B = _create_channel(
266 grpc.ssl_channel_credentials(
267 root_certificates=CA_1_PEM,
268 private_key=CLIENT_KEY_2_PEM,
269 certificate_chain=CLIENT_CERT_CHAIN_2_PEM))
270 persistent_client_stub_B = _create_client_stub(channel_B, True)
271 self._perform_rpc(persistent_client_stub_B, True)
272 actual_calls = self.cert_config_fetcher.getCalls()
273 self.assertEqual(len(actual_calls), 1)
274 self.assertFalse(actual_calls[0].did_raise)
275 self.assertIsNone(actual_calls[0].returned_cert_config)
277 # moment of truth!! client should reject server because the
278 # server switch cert...
279 cert_config = grpc.ssl_server_certificate_configuration(
280 [(SERVER_KEY_2_PEM, SERVER_CERT_CHAIN_2_PEM)],
281 root_certificates=CA_1_PEM)
282 self.cert_config_fetcher.reset()
283 self.cert_config_fetcher.configure(False, cert_config)
284 self._do_one_shot_client_rpc(False,
285 root_certificates=CA_1_PEM,
286 private_key=CLIENT_KEY_2_PEM,
287 certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
288 actual_calls = self.cert_config_fetcher.getCalls()
289 self.assertGreaterEqual(len(actual_calls), 1)
290 self.assertFalse(actual_calls[0].did_raise)
291 for i, call in enumerate(actual_calls):
292 self.assertFalse(call.did_raise, 'i= {}'.format(i))
293 self.assertEqual(call.returned_cert_config, cert_config,
296 # now should work again...
297 self.cert_config_fetcher.reset()
298 self.cert_config_fetcher.configure(False, None)
299 self._do_one_shot_client_rpc(True,
300 root_certificates=CA_2_PEM,
301 private_key=CLIENT_KEY_1_PEM,
302 certificate_chain=CLIENT_CERT_CHAIN_1_PEM)
303 actual_calls = self.cert_config_fetcher.getCalls()
304 self.assertEqual(len(actual_calls), 1)
305 self.assertFalse(actual_calls[0].did_raise)
306 self.assertIsNone(actual_calls[0].returned_cert_config)
308 # client should be rejected by server if with_client_auth
309 self.cert_config_fetcher.reset()
310 self.cert_config_fetcher.configure(False, None)
311 self._do_one_shot_client_rpc(not self.require_client_auth(),
312 root_certificates=CA_2_PEM,
313 private_key=CLIENT_KEY_2_PEM,
314 certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
315 actual_calls = self.cert_config_fetcher.getCalls()
316 self.assertGreaterEqual(len(actual_calls), 1)
317 for i, call in enumerate(actual_calls):
318 self.assertFalse(call.did_raise, 'i= {}'.format(i))
319 self.assertIsNone(call.returned_cert_config, 'i= {}'.format(i))
321 # here client should reject server...
322 self.cert_config_fetcher.reset()
323 self.cert_config_fetcher.configure(False, None)
324 self._do_one_shot_client_rpc(False,
325 root_certificates=CA_1_PEM,
326 private_key=CLIENT_KEY_2_PEM,
327 certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
328 actual_calls = self.cert_config_fetcher.getCalls()
329 self.assertGreaterEqual(len(actual_calls), 1)
330 for i, call in enumerate(actual_calls):
331 self.assertFalse(call.did_raise, 'i= {}'.format(i))
332 self.assertIsNone(call.returned_cert_config, 'i= {}'.format(i))
334 # persistent clients should continue to work
335 self.cert_config_fetcher.reset()
336 self.cert_config_fetcher.configure(False, None)
337 self._perform_rpc(persistent_client_stub_A, True)
338 actual_calls = self.cert_config_fetcher.getCalls()
339 self.assertEqual(len(actual_calls), 0)
341 self.cert_config_fetcher.reset()
342 self.cert_config_fetcher.configure(False, None)
343 self._perform_rpc(persistent_client_stub_B, True)
344 actual_calls = self.cert_config_fetcher.getCalls()
345 self.assertEqual(len(actual_calls), 0)
351 class ServerSSLCertConfigFetcherParamsChecks(unittest.TestCase):
353 def test_check_on_initial_config(self):
354 with self.assertRaises(TypeError):
355 grpc.dynamic_ssl_server_credentials(None, str)
356 with self.assertRaises(TypeError):
357 grpc.dynamic_ssl_server_credentials(1, str)
359 def test_check_on_config_fetcher(self):
360 cert_config = grpc.ssl_server_certificate_configuration(
361 [(SERVER_KEY_2_PEM, SERVER_CERT_CHAIN_2_PEM)],
362 root_certificates=CA_1_PEM)
363 with self.assertRaises(TypeError):
364 grpc.dynamic_ssl_server_credentials(cert_config, None)
365 with self.assertRaises(TypeError):
366 grpc.dynamic_ssl_server_credentials(cert_config, 1)
369 class ServerSSLCertReloadTestWithClientAuth(_ServerSSLCertReloadTest):
371 def require_client_auth(self):
374 test = _ServerSSLCertReloadTest._test
377 class ServerSSLCertReloadTestWithoutClientAuth(_ServerSSLCertReloadTest):
379 def require_client_auth(self):
382 test = _ServerSSLCertReloadTest._test
385 class ServerSSLCertReloadTestCertConfigReuse(_ServerSSLCertReloadTest):
386 """Ensures that `ServerCertificateConfiguration` instances can be reused.
388 Because gRPC Core takes ownership of the
389 `grpc_ssl_server_certificate_config` encapsulated by
390 `ServerCertificateConfiguration`, this test reuses the same
391 `ServerCertificateConfiguration` instances multiple times to make sure
392 gRPC Python takes care of maintaining the validity of
393 `ServerCertificateConfiguration` instances, so that such instances can be
394 re-used by user application.
397 def require_client_auth(self):
401 self.server = test_common.test_server()
402 services_pb2_grpc.add_FirstServiceServicer_to_server(
403 _server_application.FirstServiceServicer(), self.server)
404 self.cert_config_A = grpc.ssl_server_certificate_configuration(
405 [(SERVER_KEY_1_PEM, SERVER_CERT_CHAIN_1_PEM)],
406 root_certificates=CA_2_PEM)
407 self.cert_config_B = grpc.ssl_server_certificate_configuration(
408 [(SERVER_KEY_2_PEM, SERVER_CERT_CHAIN_2_PEM)],
409 root_certificates=CA_1_PEM)
410 self.cert_config_fetcher = CertConfigFetcher()
411 server_credentials = grpc.dynamic_ssl_server_credentials(
413 self.cert_config_fetcher,
414 require_client_authentication=True)
415 self.port = self.server.add_secure_port('[::]:0', server_credentials)
418 def test_cert_config_reuse(self):
421 self.cert_config_fetcher.reset()
422 self.cert_config_fetcher.configure(False, self.cert_config_A)
423 self._do_one_shot_client_rpc(True,
424 root_certificates=CA_1_PEM,
425 private_key=CLIENT_KEY_2_PEM,
426 certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
427 actual_calls = self.cert_config_fetcher.getCalls()
428 self.assertEqual(len(actual_calls), 1)
429 self.assertFalse(actual_calls[0].did_raise)
430 self.assertEqual(actual_calls[0].returned_cert_config,
434 self.cert_config_fetcher.reset()
435 self.cert_config_fetcher.configure(False, self.cert_config_A)
436 self._do_one_shot_client_rpc(False,
437 root_certificates=CA_2_PEM,
438 private_key=CLIENT_KEY_1_PEM,
439 certificate_chain=CLIENT_CERT_CHAIN_1_PEM)
440 actual_calls = self.cert_config_fetcher.getCalls()
441 self.assertGreaterEqual(len(actual_calls), 1)
442 self.assertFalse(actual_calls[0].did_raise)
443 for i, call in enumerate(actual_calls):
444 self.assertFalse(call.did_raise, 'i= {}'.format(i))
445 self.assertEqual(call.returned_cert_config, self.cert_config_A,
448 # succeed again with A
449 self.cert_config_fetcher.reset()
450 self.cert_config_fetcher.configure(False, self.cert_config_A)
451 self._do_one_shot_client_rpc(True,
452 root_certificates=CA_1_PEM,
453 private_key=CLIENT_KEY_2_PEM,
454 certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
455 actual_calls = self.cert_config_fetcher.getCalls()
456 self.assertEqual(len(actual_calls), 1)
457 self.assertFalse(actual_calls[0].did_raise)
458 self.assertEqual(actual_calls[0].returned_cert_config,
462 self.cert_config_fetcher.reset()
463 self.cert_config_fetcher.configure(False, self.cert_config_B)
464 self._do_one_shot_client_rpc(True,
465 root_certificates=CA_2_PEM,
466 private_key=CLIENT_KEY_1_PEM,
467 certificate_chain=CLIENT_CERT_CHAIN_1_PEM)
468 actual_calls = self.cert_config_fetcher.getCalls()
469 self.assertEqual(len(actual_calls), 1)
470 self.assertFalse(actual_calls[0].did_raise)
471 self.assertEqual(actual_calls[0].returned_cert_config,
475 self.cert_config_fetcher.reset()
476 self.cert_config_fetcher.configure(False, self.cert_config_B)
477 self._do_one_shot_client_rpc(False,
478 root_certificates=CA_1_PEM,
479 private_key=CLIENT_KEY_2_PEM,
480 certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
481 actual_calls = self.cert_config_fetcher.getCalls()
482 self.assertGreaterEqual(len(actual_calls), 1)
483 self.assertFalse(actual_calls[0].did_raise)
484 for i, call in enumerate(actual_calls):
485 self.assertFalse(call.did_raise, 'i= {}'.format(i))
486 self.assertEqual(call.returned_cert_config, self.cert_config_B,
489 # succeed again with B
490 self.cert_config_fetcher.reset()
491 self.cert_config_fetcher.configure(False, self.cert_config_B)
492 self._do_one_shot_client_rpc(True,
493 root_certificates=CA_2_PEM,
494 private_key=CLIENT_KEY_1_PEM,
495 certificate_chain=CLIENT_CERT_CHAIN_1_PEM)
496 actual_calls = self.cert_config_fetcher.getCalls()
497 self.assertEqual(len(actual_calls), 1)
498 self.assertFalse(actual_calls[0].did_raise)
499 self.assertEqual(actual_calls[0].returned_cert_config,
503 if __name__ == '__main__':
504 logging.basicConfig()
505 unittest.main(verbosity=2)