Imported Upstream version 1.27.0
[platform/upstream/grpc.git] / src / python / grpcio_tests / tests / unit / _auth_context_test.py
1 # Copyright 2017 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests exposure of SSL auth context"""
15
16 import pickle
17 import unittest
18 import logging
19
20 import grpc
21 from grpc import _channel
22 from grpc.experimental import session_cache
23 import six
24
25 from tests.unit import test_common
26 from tests.unit import resources
27
28 _REQUEST = b'\x00\x00\x00'
29 _RESPONSE = b'\x00\x00\x00'
30
31 _UNARY_UNARY = '/test/UnaryUnary'
32
33 _SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
34 _CLIENT_IDS = (
35     b'*.test.google.fr',
36     b'waterzooi.test.google.be',
37     b'*.test.youtube.com',
38     b'192.168.1.3',
39 )
40 _ID = 'id'
41 _ID_KEY = 'id_key'
42 _AUTH_CTX = 'auth_ctx'
43
44 _PRIVATE_KEY = resources.private_key()
45 _CERTIFICATE_CHAIN = resources.certificate_chain()
46 _TEST_ROOT_CERTIFICATES = resources.test_root_certificates()
47 _SERVER_CERTS = ((_PRIVATE_KEY, _CERTIFICATE_CHAIN),)
48 _PROPERTY_OPTIONS = ((
49     'grpc.ssl_target_name_override',
50     _SERVER_HOST_OVERRIDE,
51 ),)
52
53
54 def handle_unary_unary(request, servicer_context):
55     return pickle.dumps({
56         _ID: servicer_context.peer_identities(),
57         _ID_KEY: servicer_context.peer_identity_key(),
58         _AUTH_CTX: servicer_context.auth_context()
59     })
60
61
62 class AuthContextTest(unittest.TestCase):
63
64     def testInsecure(self):
65         handler = grpc.method_handlers_generic_handler('test', {
66             'UnaryUnary':
67                 grpc.unary_unary_rpc_method_handler(handle_unary_unary)
68         })
69         server = test_common.test_server()
70         server.add_generic_rpc_handlers((handler,))
71         port = server.add_insecure_port('[::]:0')
72         server.start()
73
74         with grpc.insecure_channel('localhost:%d' % port) as channel:
75             response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
76         server.stop(None)
77
78         auth_data = pickle.loads(response)
79         self.assertIsNone(auth_data[_ID])
80         self.assertIsNone(auth_data[_ID_KEY])
81         self.assertDictEqual({}, auth_data[_AUTH_CTX])
82
83     def testSecureNoCert(self):
84         handler = grpc.method_handlers_generic_handler('test', {
85             'UnaryUnary':
86                 grpc.unary_unary_rpc_method_handler(handle_unary_unary)
87         })
88         server = test_common.test_server()
89         server.add_generic_rpc_handlers((handler,))
90         server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
91         port = server.add_secure_port('[::]:0', server_cred)
92         server.start()
93
94         channel_creds = grpc.ssl_channel_credentials(
95             root_certificates=_TEST_ROOT_CERTIFICATES)
96         channel = grpc.secure_channel('localhost:{}'.format(port),
97                                       channel_creds,
98                                       options=_PROPERTY_OPTIONS)
99         response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
100         channel.close()
101         server.stop(None)
102
103         auth_data = pickle.loads(response)
104         self.assertIsNone(auth_data[_ID])
105         self.assertIsNone(auth_data[_ID_KEY])
106         self.assertDictEqual(
107             {
108                 'security_level': [b'TSI_PRIVACY_AND_INTEGRITY'],
109                 'transport_security_type': [b'ssl'],
110                 'ssl_session_reused': [b'false'],
111             }, auth_data[_AUTH_CTX])
112
113     def testSecureClientCert(self):
114         handler = grpc.method_handlers_generic_handler('test', {
115             'UnaryUnary':
116                 grpc.unary_unary_rpc_method_handler(handle_unary_unary)
117         })
118         server = test_common.test_server()
119         server.add_generic_rpc_handlers((handler,))
120         server_cred = grpc.ssl_server_credentials(
121             _SERVER_CERTS,
122             root_certificates=_TEST_ROOT_CERTIFICATES,
123             require_client_auth=True)
124         port = server.add_secure_port('[::]:0', server_cred)
125         server.start()
126
127         channel_creds = grpc.ssl_channel_credentials(
128             root_certificates=_TEST_ROOT_CERTIFICATES,
129             private_key=_PRIVATE_KEY,
130             certificate_chain=_CERTIFICATE_CHAIN)
131         channel = grpc.secure_channel('localhost:{}'.format(port),
132                                       channel_creds,
133                                       options=_PROPERTY_OPTIONS)
134
135         response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
136         channel.close()
137         server.stop(None)
138
139         auth_data = pickle.loads(response)
140         auth_ctx = auth_data[_AUTH_CTX]
141         six.assertCountEqual(self, _CLIENT_IDS, auth_data[_ID])
142         self.assertEqual('x509_subject_alternative_name', auth_data[_ID_KEY])
143         self.assertSequenceEqual([b'ssl'], auth_ctx['transport_security_type'])
144         self.assertSequenceEqual([b'*.test.google.com'],
145                                  auth_ctx['x509_common_name'])
146
147     def _do_one_shot_client_rpc(self, channel_creds, channel_options, port,
148                                 expect_ssl_session_reused):
149         channel = grpc.secure_channel('localhost:{}'.format(port),
150                                       channel_creds,
151                                       options=channel_options)
152         response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
153         auth_data = pickle.loads(response)
154         self.assertEqual(expect_ssl_session_reused,
155                          auth_data[_AUTH_CTX]['ssl_session_reused'])
156         channel.close()
157
158     def testSessionResumption(self):
159         # Set up a secure server
160         handler = grpc.method_handlers_generic_handler('test', {
161             'UnaryUnary':
162                 grpc.unary_unary_rpc_method_handler(handle_unary_unary)
163         })
164         server = test_common.test_server()
165         server.add_generic_rpc_handlers((handler,))
166         server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
167         port = server.add_secure_port('[::]:0', server_cred)
168         server.start()
169
170         # Create a cache for TLS session tickets
171         cache = session_cache.ssl_session_cache_lru(1)
172         channel_creds = grpc.ssl_channel_credentials(
173             root_certificates=_TEST_ROOT_CERTIFICATES)
174         channel_options = _PROPERTY_OPTIONS + (
175             ('grpc.ssl_session_cache', cache),)
176
177         # Initial connection has no session to resume
178         self._do_one_shot_client_rpc(channel_creds,
179                                      channel_options,
180                                      port,
181                                      expect_ssl_session_reused=[b'false'])
182
183         # Subsequent connections resume sessions
184         self._do_one_shot_client_rpc(channel_creds,
185                                      channel_options,
186                                      port,
187                                      expect_ssl_session_reused=[b'true'])
188         server.stop(None)
189
190
191 if __name__ == '__main__':
192     logging.basicConfig()
193     unittest.main(verbosity=2)