Imported Upstream version 1.27.0
[platform/upstream/grpc.git] / src / python / grpcio_tests / tests / unit / _server_ssl_cert_config_test.py
1 # Copyright 2017 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests server certificate rotation.
15
16 Here we test various aspects of gRPC Python, and in some cases gRPC
17 Core by extension, support for server certificate rotation.
18
19 * ServerSSLCertReloadTestWithClientAuth: test ability to rotate
20   server's SSL cert for use in future channels with clients while not
21   affecting any existing channel. The server requires client
22   authentication.
23
24 * ServerSSLCertReloadTestWithoutClientAuth: like
25   ServerSSLCertReloadTestWithClientAuth except that the server does
26   not authenticate the client.
27
28 * ServerSSLCertReloadTestCertConfigReuse: tests gRPC Python's ability
29   to deal with user's reuse of ServerCertificateConfiguration instances.
30 """
31
32 import abc
33 import collections
34 import os
35 import six
36 import threading
37 import unittest
38 import logging
39
40 from concurrent import futures
41
42 import grpc
43 from tests.unit import resources
44 from tests.unit import test_common
45 from tests.testing import _application_common
46 from tests.testing import _server_application
47 from tests.testing.proto import services_pb2_grpc
48
49 CA_1_PEM = resources.cert_hier_1_root_ca_cert()
50 CA_2_PEM = resources.cert_hier_2_root_ca_cert()
51
52 CLIENT_KEY_1_PEM = resources.cert_hier_1_client_1_key()
53 CLIENT_CERT_CHAIN_1_PEM = (resources.cert_hier_1_client_1_cert() +
54                            resources.cert_hier_1_intermediate_ca_cert())
55
56 CLIENT_KEY_2_PEM = resources.cert_hier_2_client_1_key()
57 CLIENT_CERT_CHAIN_2_PEM = (resources.cert_hier_2_client_1_cert() +
58                            resources.cert_hier_2_intermediate_ca_cert())
59
60 SERVER_KEY_1_PEM = resources.cert_hier_1_server_1_key()
61 SERVER_CERT_CHAIN_1_PEM = (resources.cert_hier_1_server_1_cert() +
62                            resources.cert_hier_1_intermediate_ca_cert())
63
64 SERVER_KEY_2_PEM = resources.cert_hier_2_server_1_key()
65 SERVER_CERT_CHAIN_2_PEM = (resources.cert_hier_2_server_1_cert() +
66                            resources.cert_hier_2_intermediate_ca_cert())
67
68 # for use with the CertConfigFetcher. Roughly a simple custom mock
69 # implementation
70 Call = collections.namedtuple('Call', ['did_raise', 'returned_cert_config'])
71
72
73 def _create_channel(port, credentials):
74     return grpc.secure_channel('localhost:{}'.format(port), credentials)
75
76
77 def _create_client_stub(channel, expect_success):
78     if expect_success:
79         # per Nathaniel: there's some robustness issue if we start
80         # using a channel without waiting for it to be actually ready
81         grpc.channel_ready_future(channel).result(timeout=10)
82     return services_pb2_grpc.FirstServiceStub(channel)
83
84
85 class CertConfigFetcher(object):
86
87     def __init__(self):
88         self._lock = threading.Lock()
89         self._calls = []
90         self._should_raise = False
91         self._cert_config = None
92
93     def reset(self):
94         with self._lock:
95             self._calls = []
96             self._should_raise = False
97             self._cert_config = None
98
99     def configure(self, should_raise, cert_config):
100         assert not (should_raise and cert_config), (
101             "should not specify both should_raise and a cert_config at the same time"
102         )
103         with self._lock:
104             self._should_raise = should_raise
105             self._cert_config = cert_config
106
107     def getCalls(self):
108         with self._lock:
109             return self._calls
110
111     def __call__(self):
112         with self._lock:
113             if self._should_raise:
114                 self._calls.append(Call(True, None))
115                 raise ValueError('just for fun, should not affect the test')
116             else:
117                 self._calls.append(Call(False, self._cert_config))
118                 return self._cert_config
119
120
121 class _ServerSSLCertReloadTest(
122         six.with_metaclass(abc.ABCMeta, unittest.TestCase)):
123
124     def __init__(self, *args, **kwargs):
125         super(_ServerSSLCertReloadTest, self).__init__(*args, **kwargs)
126         self.server = None
127         self.port = None
128
129     @abc.abstractmethod
130     def require_client_auth(self):
131         raise NotImplementedError()
132
133     def setUp(self):
134         self.server = test_common.test_server()
135         services_pb2_grpc.add_FirstServiceServicer_to_server(
136             _server_application.FirstServiceServicer(), self.server)
137         switch_cert_on_client_num = 10
138         initial_cert_config = grpc.ssl_server_certificate_configuration(
139             [(SERVER_KEY_1_PEM, SERVER_CERT_CHAIN_1_PEM)],
140             root_certificates=CA_2_PEM)
141         self.cert_config_fetcher = CertConfigFetcher()
142         server_credentials = grpc.dynamic_ssl_server_credentials(
143             initial_cert_config,
144             self.cert_config_fetcher,
145             require_client_authentication=self.require_client_auth())
146         self.port = self.server.add_secure_port('[::]:0', server_credentials)
147         self.server.start()
148
149     def tearDown(self):
150         if self.server:
151             self.server.stop(None)
152
153     def _perform_rpc(self, client_stub, expect_success):
154         # we don't care about the actual response of the rpc; only
155         # whether we can perform it or not, and if not, the status
156         # code must be UNAVAILABLE
157         request = _application_common.UNARY_UNARY_REQUEST
158         if expect_success:
159             response = client_stub.UnUn(request)
160             self.assertEqual(response, _application_common.UNARY_UNARY_RESPONSE)
161         else:
162             with self.assertRaises(grpc.RpcError) as exception_context:
163                 client_stub.UnUn(request)
164             self.assertEqual(exception_context.exception.code(),
165                              grpc.StatusCode.UNAVAILABLE)
166
167     def _do_one_shot_client_rpc(self,
168                                 expect_success,
169                                 root_certificates=None,
170                                 private_key=None,
171                                 certificate_chain=None):
172         credentials = grpc.ssl_channel_credentials(
173             root_certificates=root_certificates,
174             private_key=private_key,
175             certificate_chain=certificate_chain)
176         with _create_channel(self.port, credentials) as client_channel:
177             client_stub = _create_client_stub(client_channel, expect_success)
178             self._perform_rpc(client_stub, expect_success)
179
180     def _test(self):
181         # things should work...
182         self.cert_config_fetcher.configure(False, None)
183         self._do_one_shot_client_rpc(True,
184                                      root_certificates=CA_1_PEM,
185                                      private_key=CLIENT_KEY_2_PEM,
186                                      certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
187         actual_calls = self.cert_config_fetcher.getCalls()
188         self.assertEqual(len(actual_calls), 1)
189         self.assertFalse(actual_calls[0].did_raise)
190         self.assertIsNone(actual_calls[0].returned_cert_config)
191
192         # client should reject server...
193         # fails because client trusts ca2 and so will reject server
194         self.cert_config_fetcher.reset()
195         self.cert_config_fetcher.configure(False, None)
196         self._do_one_shot_client_rpc(False,
197                                      root_certificates=CA_2_PEM,
198                                      private_key=CLIENT_KEY_2_PEM,
199                                      certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
200         actual_calls = self.cert_config_fetcher.getCalls()
201         self.assertGreaterEqual(len(actual_calls), 1)
202         self.assertFalse(actual_calls[0].did_raise)
203         for i, call in enumerate(actual_calls):
204             self.assertFalse(call.did_raise, 'i= {}'.format(i))
205             self.assertIsNone(call.returned_cert_config, 'i= {}'.format(i))
206
207         # should work again...
208         self.cert_config_fetcher.reset()
209         self.cert_config_fetcher.configure(True, None)
210         self._do_one_shot_client_rpc(True,
211                                      root_certificates=CA_1_PEM,
212                                      private_key=CLIENT_KEY_2_PEM,
213                                      certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
214         actual_calls = self.cert_config_fetcher.getCalls()
215         self.assertEqual(len(actual_calls), 1)
216         self.assertTrue(actual_calls[0].did_raise)
217         self.assertIsNone(actual_calls[0].returned_cert_config)
218
219         # if with_client_auth, then client should be rejected by
220         # server because client uses key/cert1, but server trusts ca2,
221         # so server will reject
222         self.cert_config_fetcher.reset()
223         self.cert_config_fetcher.configure(False, None)
224         self._do_one_shot_client_rpc(not self.require_client_auth(),
225                                      root_certificates=CA_1_PEM,
226                                      private_key=CLIENT_KEY_1_PEM,
227                                      certificate_chain=CLIENT_CERT_CHAIN_1_PEM)
228         actual_calls = self.cert_config_fetcher.getCalls()
229         self.assertGreaterEqual(len(actual_calls), 1)
230         for i, call in enumerate(actual_calls):
231             self.assertFalse(call.did_raise, 'i= {}'.format(i))
232             self.assertIsNone(call.returned_cert_config, 'i= {}'.format(i))
233
234         # should work again...
235         self.cert_config_fetcher.reset()
236         self.cert_config_fetcher.configure(False, None)
237         self._do_one_shot_client_rpc(True,
238                                      root_certificates=CA_1_PEM,
239                                      private_key=CLIENT_KEY_2_PEM,
240                                      certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
241         actual_calls = self.cert_config_fetcher.getCalls()
242         self.assertEqual(len(actual_calls), 1)
243         self.assertFalse(actual_calls[0].did_raise)
244         self.assertIsNone(actual_calls[0].returned_cert_config)
245
246         # now create the "persistent" clients
247         self.cert_config_fetcher.reset()
248         self.cert_config_fetcher.configure(False, None)
249         channel_A = _create_channel(
250             self.port,
251             grpc.ssl_channel_credentials(
252                 root_certificates=CA_1_PEM,
253                 private_key=CLIENT_KEY_2_PEM,
254                 certificate_chain=CLIENT_CERT_CHAIN_2_PEM))
255         persistent_client_stub_A = _create_client_stub(channel_A, True)
256         self._perform_rpc(persistent_client_stub_A, True)
257         actual_calls = self.cert_config_fetcher.getCalls()
258         self.assertEqual(len(actual_calls), 1)
259         self.assertFalse(actual_calls[0].did_raise)
260         self.assertIsNone(actual_calls[0].returned_cert_config)
261
262         self.cert_config_fetcher.reset()
263         self.cert_config_fetcher.configure(False, None)
264         channel_B = _create_channel(
265             self.port,
266             grpc.ssl_channel_credentials(
267                 root_certificates=CA_1_PEM,
268                 private_key=CLIENT_KEY_2_PEM,
269                 certificate_chain=CLIENT_CERT_CHAIN_2_PEM))
270         persistent_client_stub_B = _create_client_stub(channel_B, True)
271         self._perform_rpc(persistent_client_stub_B, True)
272         actual_calls = self.cert_config_fetcher.getCalls()
273         self.assertEqual(len(actual_calls), 1)
274         self.assertFalse(actual_calls[0].did_raise)
275         self.assertIsNone(actual_calls[0].returned_cert_config)
276
277         # moment of truth!! client should reject server because the
278         # server switch cert...
279         cert_config = grpc.ssl_server_certificate_configuration(
280             [(SERVER_KEY_2_PEM, SERVER_CERT_CHAIN_2_PEM)],
281             root_certificates=CA_1_PEM)
282         self.cert_config_fetcher.reset()
283         self.cert_config_fetcher.configure(False, cert_config)
284         self._do_one_shot_client_rpc(False,
285                                      root_certificates=CA_1_PEM,
286                                      private_key=CLIENT_KEY_2_PEM,
287                                      certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
288         actual_calls = self.cert_config_fetcher.getCalls()
289         self.assertGreaterEqual(len(actual_calls), 1)
290         self.assertFalse(actual_calls[0].did_raise)
291         for i, call in enumerate(actual_calls):
292             self.assertFalse(call.did_raise, 'i= {}'.format(i))
293             self.assertEqual(call.returned_cert_config, cert_config,
294                              'i= {}'.format(i))
295
296         # now should work again...
297         self.cert_config_fetcher.reset()
298         self.cert_config_fetcher.configure(False, None)
299         self._do_one_shot_client_rpc(True,
300                                      root_certificates=CA_2_PEM,
301                                      private_key=CLIENT_KEY_1_PEM,
302                                      certificate_chain=CLIENT_CERT_CHAIN_1_PEM)
303         actual_calls = self.cert_config_fetcher.getCalls()
304         self.assertEqual(len(actual_calls), 1)
305         self.assertFalse(actual_calls[0].did_raise)
306         self.assertIsNone(actual_calls[0].returned_cert_config)
307
308         # client should be rejected by server if with_client_auth
309         self.cert_config_fetcher.reset()
310         self.cert_config_fetcher.configure(False, None)
311         self._do_one_shot_client_rpc(not self.require_client_auth(),
312                                      root_certificates=CA_2_PEM,
313                                      private_key=CLIENT_KEY_2_PEM,
314                                      certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
315         actual_calls = self.cert_config_fetcher.getCalls()
316         self.assertGreaterEqual(len(actual_calls), 1)
317         for i, call in enumerate(actual_calls):
318             self.assertFalse(call.did_raise, 'i= {}'.format(i))
319             self.assertIsNone(call.returned_cert_config, 'i= {}'.format(i))
320
321         # here client should reject server...
322         self.cert_config_fetcher.reset()
323         self.cert_config_fetcher.configure(False, None)
324         self._do_one_shot_client_rpc(False,
325                                      root_certificates=CA_1_PEM,
326                                      private_key=CLIENT_KEY_2_PEM,
327                                      certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
328         actual_calls = self.cert_config_fetcher.getCalls()
329         self.assertGreaterEqual(len(actual_calls), 1)
330         for i, call in enumerate(actual_calls):
331             self.assertFalse(call.did_raise, 'i= {}'.format(i))
332             self.assertIsNone(call.returned_cert_config, 'i= {}'.format(i))
333
334         # persistent clients should continue to work
335         self.cert_config_fetcher.reset()
336         self.cert_config_fetcher.configure(False, None)
337         self._perform_rpc(persistent_client_stub_A, True)
338         actual_calls = self.cert_config_fetcher.getCalls()
339         self.assertEqual(len(actual_calls), 0)
340
341         self.cert_config_fetcher.reset()
342         self.cert_config_fetcher.configure(False, None)
343         self._perform_rpc(persistent_client_stub_B, True)
344         actual_calls = self.cert_config_fetcher.getCalls()
345         self.assertEqual(len(actual_calls), 0)
346
347         channel_A.close()
348         channel_B.close()
349
350
351 class ServerSSLCertConfigFetcherParamsChecks(unittest.TestCase):
352
353     def test_check_on_initial_config(self):
354         with self.assertRaises(TypeError):
355             grpc.dynamic_ssl_server_credentials(None, str)
356         with self.assertRaises(TypeError):
357             grpc.dynamic_ssl_server_credentials(1, str)
358
359     def test_check_on_config_fetcher(self):
360         cert_config = grpc.ssl_server_certificate_configuration(
361             [(SERVER_KEY_2_PEM, SERVER_CERT_CHAIN_2_PEM)],
362             root_certificates=CA_1_PEM)
363         with self.assertRaises(TypeError):
364             grpc.dynamic_ssl_server_credentials(cert_config, None)
365         with self.assertRaises(TypeError):
366             grpc.dynamic_ssl_server_credentials(cert_config, 1)
367
368
369 class ServerSSLCertReloadTestWithClientAuth(_ServerSSLCertReloadTest):
370
371     def require_client_auth(self):
372         return True
373
374     test = _ServerSSLCertReloadTest._test
375
376
377 class ServerSSLCertReloadTestWithoutClientAuth(_ServerSSLCertReloadTest):
378
379     def require_client_auth(self):
380         return False
381
382     test = _ServerSSLCertReloadTest._test
383
384
385 class ServerSSLCertReloadTestCertConfigReuse(_ServerSSLCertReloadTest):
386     """Ensures that `ServerCertificateConfiguration` instances can be reused.
387
388     Because gRPC Core takes ownership of the
389     `grpc_ssl_server_certificate_config` encapsulated by
390     `ServerCertificateConfiguration`, this test reuses the same
391     `ServerCertificateConfiguration` instances multiple times to make sure
392     gRPC Python takes care of maintaining the validity of
393     `ServerCertificateConfiguration` instances, so that such instances can be
394     re-used by user application.
395     """
396
397     def require_client_auth(self):
398         return True
399
400     def setUp(self):
401         self.server = test_common.test_server()
402         services_pb2_grpc.add_FirstServiceServicer_to_server(
403             _server_application.FirstServiceServicer(), self.server)
404         self.cert_config_A = grpc.ssl_server_certificate_configuration(
405             [(SERVER_KEY_1_PEM, SERVER_CERT_CHAIN_1_PEM)],
406             root_certificates=CA_2_PEM)
407         self.cert_config_B = grpc.ssl_server_certificate_configuration(
408             [(SERVER_KEY_2_PEM, SERVER_CERT_CHAIN_2_PEM)],
409             root_certificates=CA_1_PEM)
410         self.cert_config_fetcher = CertConfigFetcher()
411         server_credentials = grpc.dynamic_ssl_server_credentials(
412             self.cert_config_A,
413             self.cert_config_fetcher,
414             require_client_authentication=True)
415         self.port = self.server.add_secure_port('[::]:0', server_credentials)
416         self.server.start()
417
418     def test_cert_config_reuse(self):
419
420         # succeed with A
421         self.cert_config_fetcher.reset()
422         self.cert_config_fetcher.configure(False, self.cert_config_A)
423         self._do_one_shot_client_rpc(True,
424                                      root_certificates=CA_1_PEM,
425                                      private_key=CLIENT_KEY_2_PEM,
426                                      certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
427         actual_calls = self.cert_config_fetcher.getCalls()
428         self.assertEqual(len(actual_calls), 1)
429         self.assertFalse(actual_calls[0].did_raise)
430         self.assertEqual(actual_calls[0].returned_cert_config,
431                          self.cert_config_A)
432
433         # fail with A
434         self.cert_config_fetcher.reset()
435         self.cert_config_fetcher.configure(False, self.cert_config_A)
436         self._do_one_shot_client_rpc(False,
437                                      root_certificates=CA_2_PEM,
438                                      private_key=CLIENT_KEY_1_PEM,
439                                      certificate_chain=CLIENT_CERT_CHAIN_1_PEM)
440         actual_calls = self.cert_config_fetcher.getCalls()
441         self.assertGreaterEqual(len(actual_calls), 1)
442         self.assertFalse(actual_calls[0].did_raise)
443         for i, call in enumerate(actual_calls):
444             self.assertFalse(call.did_raise, 'i= {}'.format(i))
445             self.assertEqual(call.returned_cert_config, self.cert_config_A,
446                              'i= {}'.format(i))
447
448         # succeed again with A
449         self.cert_config_fetcher.reset()
450         self.cert_config_fetcher.configure(False, self.cert_config_A)
451         self._do_one_shot_client_rpc(True,
452                                      root_certificates=CA_1_PEM,
453                                      private_key=CLIENT_KEY_2_PEM,
454                                      certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
455         actual_calls = self.cert_config_fetcher.getCalls()
456         self.assertEqual(len(actual_calls), 1)
457         self.assertFalse(actual_calls[0].did_raise)
458         self.assertEqual(actual_calls[0].returned_cert_config,
459                          self.cert_config_A)
460
461         # succeed with B
462         self.cert_config_fetcher.reset()
463         self.cert_config_fetcher.configure(False, self.cert_config_B)
464         self._do_one_shot_client_rpc(True,
465                                      root_certificates=CA_2_PEM,
466                                      private_key=CLIENT_KEY_1_PEM,
467                                      certificate_chain=CLIENT_CERT_CHAIN_1_PEM)
468         actual_calls = self.cert_config_fetcher.getCalls()
469         self.assertEqual(len(actual_calls), 1)
470         self.assertFalse(actual_calls[0].did_raise)
471         self.assertEqual(actual_calls[0].returned_cert_config,
472                          self.cert_config_B)
473
474         # fail with B
475         self.cert_config_fetcher.reset()
476         self.cert_config_fetcher.configure(False, self.cert_config_B)
477         self._do_one_shot_client_rpc(False,
478                                      root_certificates=CA_1_PEM,
479                                      private_key=CLIENT_KEY_2_PEM,
480                                      certificate_chain=CLIENT_CERT_CHAIN_2_PEM)
481         actual_calls = self.cert_config_fetcher.getCalls()
482         self.assertGreaterEqual(len(actual_calls), 1)
483         self.assertFalse(actual_calls[0].did_raise)
484         for i, call in enumerate(actual_calls):
485             self.assertFalse(call.did_raise, 'i= {}'.format(i))
486             self.assertEqual(call.returned_cert_config, self.cert_config_B,
487                              'i= {}'.format(i))
488
489         # succeed again with B
490         self.cert_config_fetcher.reset()
491         self.cert_config_fetcher.configure(False, self.cert_config_B)
492         self._do_one_shot_client_rpc(True,
493                                      root_certificates=CA_2_PEM,
494                                      private_key=CLIENT_KEY_1_PEM,
495                                      certificate_chain=CLIENT_CERT_CHAIN_1_PEM)
496         actual_calls = self.cert_config_fetcher.getCalls()
497         self.assertEqual(len(actual_calls), 1)
498         self.assertFalse(actual_calls[0].did_raise)
499         self.assertEqual(actual_calls[0].returned_cert_config,
500                          self.cert_config_B)
501
502
503 if __name__ == '__main__':
504     logging.basicConfig()
505     unittest.main(verbosity=2)