Imported Upstream version 1.18.0
[platform/upstream/grpc.git] / src / python / grpcio_tests / tests / unit / _api_test.py
1 # Copyright 2016 gRPC authors.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """Test of gRPC Python's application-layer API."""
15
16 import unittest
17 import logging
18
19 import six
20
21 import grpc
22
23 from tests.unit import _from_grpc_import_star
24
25
26 class AllTest(unittest.TestCase):
27
28     def testAll(self):
29         expected_grpc_code_elements = (
30             'FutureTimeoutError',
31             'FutureCancelledError',
32             'Future',
33             'ChannelConnectivity',
34             'StatusCode',
35             'Status',
36             'RpcError',
37             'RpcContext',
38             'Call',
39             'ChannelCredentials',
40             'CallCredentials',
41             'AuthMetadataContext',
42             'AuthMetadataPluginCallback',
43             'AuthMetadataPlugin',
44             'ServerCertificateConfiguration',
45             'ServerCredentials',
46             'UnaryUnaryMultiCallable',
47             'UnaryStreamMultiCallable',
48             'StreamUnaryMultiCallable',
49             'StreamStreamMultiCallable',
50             'UnaryUnaryClientInterceptor',
51             'UnaryStreamClientInterceptor',
52             'StreamUnaryClientInterceptor',
53             'StreamStreamClientInterceptor',
54             'Channel',
55             'ServicerContext',
56             'RpcMethodHandler',
57             'HandlerCallDetails',
58             'GenericRpcHandler',
59             'ServiceRpcHandler',
60             'Server',
61             'ServerInterceptor',
62             'unary_unary_rpc_method_handler',
63             'unary_stream_rpc_method_handler',
64             'stream_unary_rpc_method_handler',
65             'ClientCallDetails',
66             'stream_stream_rpc_method_handler',
67             'method_handlers_generic_handler',
68             'ssl_channel_credentials',
69             'metadata_call_credentials',
70             'access_token_call_credentials',
71             'composite_call_credentials',
72             'composite_channel_credentials',
73             'ssl_server_credentials',
74             'ssl_server_certificate_configuration',
75             'dynamic_ssl_server_credentials',
76             'channel_ready_future',
77             'insecure_channel',
78             'secure_channel',
79             'intercept_channel',
80             'server',
81         )
82
83         six.assertCountEqual(self, expected_grpc_code_elements,
84                              _from_grpc_import_star.GRPC_ELEMENTS)
85
86
87 class ChannelConnectivityTest(unittest.TestCase):
88
89     def testChannelConnectivity(self):
90         self.assertSequenceEqual((
91             grpc.ChannelConnectivity.IDLE,
92             grpc.ChannelConnectivity.CONNECTING,
93             grpc.ChannelConnectivity.READY,
94             grpc.ChannelConnectivity.TRANSIENT_FAILURE,
95             grpc.ChannelConnectivity.SHUTDOWN,
96         ), tuple(grpc.ChannelConnectivity))
97
98
99 class ChannelTest(unittest.TestCase):
100
101     def test_secure_channel(self):
102         channel_credentials = grpc.ssl_channel_credentials()
103         channel = grpc.secure_channel('google.com:443', channel_credentials)
104         channel.close()
105
106
107 if __name__ == '__main__':
108     logging.basicConfig()
109     unittest.main(verbosity=2)