1 # Copyright 2017 gRPC authors.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Tests exposure of SSL auth context"""
21 from grpc import _channel
22 from grpc.experimental import session_cache
25 from tests.unit import test_common
26 from tests.unit import resources
28 _REQUEST = b'\x00\x00\x00'
29 _RESPONSE = b'\x00\x00\x00'
31 _UNARY_UNARY = '/test/UnaryUnary'
33 _SERVER_HOST_OVERRIDE = 'foo.test.google.fr'
36 b'waterzooi.test.google.be',
37 b'*.test.youtube.com',
42 _AUTH_CTX = 'auth_ctx'
44 _PRIVATE_KEY = resources.private_key()
45 _CERTIFICATE_CHAIN = resources.certificate_chain()
46 _TEST_ROOT_CERTIFICATES = resources.test_root_certificates()
47 _SERVER_CERTS = ((_PRIVATE_KEY, _CERTIFICATE_CHAIN),)
48 _PROPERTY_OPTIONS = ((
49 'grpc.ssl_target_name_override',
50 _SERVER_HOST_OVERRIDE,
54 def handle_unary_unary(request, servicer_context):
56 _ID: servicer_context.peer_identities(),
57 _ID_KEY: servicer_context.peer_identity_key(),
58 _AUTH_CTX: servicer_context.auth_context()
62 class AuthContextTest(unittest.TestCase):
64 def testInsecure(self):
65 handler = grpc.method_handlers_generic_handler('test', {
67 grpc.unary_unary_rpc_method_handler(handle_unary_unary)
69 server = test_common.test_server()
70 server.add_generic_rpc_handlers((handler,))
71 port = server.add_insecure_port('[::]:0')
74 with grpc.insecure_channel('localhost:%d' % port) as channel:
75 response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
78 auth_data = pickle.loads(response)
79 self.assertIsNone(auth_data[_ID])
80 self.assertIsNone(auth_data[_ID_KEY])
81 self.assertDictEqual({}, auth_data[_AUTH_CTX])
83 def testSecureNoCert(self):
84 handler = grpc.method_handlers_generic_handler('test', {
86 grpc.unary_unary_rpc_method_handler(handle_unary_unary)
88 server = test_common.test_server()
89 server.add_generic_rpc_handlers((handler,))
90 server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
91 port = server.add_secure_port('[::]:0', server_cred)
94 channel_creds = grpc.ssl_channel_credentials(
95 root_certificates=_TEST_ROOT_CERTIFICATES)
96 channel = grpc.secure_channel('localhost:{}'.format(port),
98 options=_PROPERTY_OPTIONS)
99 response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
103 auth_data = pickle.loads(response)
104 self.assertIsNone(auth_data[_ID])
105 self.assertIsNone(auth_data[_ID_KEY])
106 self.assertDictEqual(
108 'security_level': [b'TSI_PRIVACY_AND_INTEGRITY'],
109 'transport_security_type': [b'ssl'],
110 'ssl_session_reused': [b'false'],
111 }, auth_data[_AUTH_CTX])
113 def testSecureClientCert(self):
114 handler = grpc.method_handlers_generic_handler('test', {
116 grpc.unary_unary_rpc_method_handler(handle_unary_unary)
118 server = test_common.test_server()
119 server.add_generic_rpc_handlers((handler,))
120 server_cred = grpc.ssl_server_credentials(
122 root_certificates=_TEST_ROOT_CERTIFICATES,
123 require_client_auth=True)
124 port = server.add_secure_port('[::]:0', server_cred)
127 channel_creds = grpc.ssl_channel_credentials(
128 root_certificates=_TEST_ROOT_CERTIFICATES,
129 private_key=_PRIVATE_KEY,
130 certificate_chain=_CERTIFICATE_CHAIN)
131 channel = grpc.secure_channel('localhost:{}'.format(port),
133 options=_PROPERTY_OPTIONS)
135 response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
139 auth_data = pickle.loads(response)
140 auth_ctx = auth_data[_AUTH_CTX]
141 six.assertCountEqual(self, _CLIENT_IDS, auth_data[_ID])
142 self.assertEqual('x509_subject_alternative_name', auth_data[_ID_KEY])
143 self.assertSequenceEqual([b'ssl'], auth_ctx['transport_security_type'])
144 self.assertSequenceEqual([b'*.test.google.com'],
145 auth_ctx['x509_common_name'])
147 def _do_one_shot_client_rpc(self, channel_creds, channel_options, port,
148 expect_ssl_session_reused):
149 channel = grpc.secure_channel('localhost:{}'.format(port),
151 options=channel_options)
152 response = channel.unary_unary(_UNARY_UNARY)(_REQUEST)
153 auth_data = pickle.loads(response)
154 self.assertEqual(expect_ssl_session_reused,
155 auth_data[_AUTH_CTX]['ssl_session_reused'])
158 def testSessionResumption(self):
159 # Set up a secure server
160 handler = grpc.method_handlers_generic_handler('test', {
162 grpc.unary_unary_rpc_method_handler(handle_unary_unary)
164 server = test_common.test_server()
165 server.add_generic_rpc_handlers((handler,))
166 server_cred = grpc.ssl_server_credentials(_SERVER_CERTS)
167 port = server.add_secure_port('[::]:0', server_cred)
170 # Create a cache for TLS session tickets
171 cache = session_cache.ssl_session_cache_lru(1)
172 channel_creds = grpc.ssl_channel_credentials(
173 root_certificates=_TEST_ROOT_CERTIFICATES)
174 channel_options = _PROPERTY_OPTIONS + (
175 ('grpc.ssl_session_cache', cache),)
177 # Initial connection has no session to resume
178 self._do_one_shot_client_rpc(channel_creds,
181 expect_ssl_session_reused=[b'false'])
183 # Subsequent connections resume sessions
184 self._do_one_shot_client_rpc(channel_creds,
187 expect_ssl_session_reused=[b'true'])
191 if __name__ == '__main__':
192 logging.basicConfig()
193 unittest.main(verbosity=2)