Imported Upstream version 1.27.0
[platform/upstream/grpc.git] / tools / run_tests / python_utils / port_server.py
1 #!/usr/bin/env python2.7
2 # Copyright 2015 gRPC authors.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Manage TCP ports for unit tests; started by run_tests.py"""
16
17 from __future__ import print_function
18
19 import argparse
20 from six.moves.BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
21 from six.moves.socketserver import ThreadingMixIn
22 import hashlib
23 import os
24 import socket
25 import sys
26 import time
27 import random
28 import threading
29 import platform
30
31 # increment this number whenever making a change to ensure that
32 # the changes are picked up by running CI servers
33 # note that all changes must be backwards compatible
34 _MY_VERSION = 20
35
36 if len(sys.argv) == 2 and sys.argv[1] == 'dump_version':
37     print(_MY_VERSION)
38     sys.exit(0)
39
40 argp = argparse.ArgumentParser(description='Server for httpcli_test')
41 argp.add_argument('-p', '--port', default=12345, type=int)
42 argp.add_argument('-l', '--logfile', default=None, type=str)
43 args = argp.parse_args()
44
45 if args.logfile is not None:
46     sys.stdin.close()
47     sys.stderr.close()
48     sys.stdout.close()
49     sys.stderr = open(args.logfile, 'w')
50     sys.stdout = sys.stderr
51
52 print('port server running on port %d' % args.port)
53
54 pool = []
55 in_use = {}
56 mu = threading.Lock()
57
58 # Cronet restricts the following ports to be used (see
59 # https://cs.chromium.org/chromium/src/net/base/port_util.cc). When one of these
60 # ports is used in a Cronet test, the test would fail (see issue #12149). These
61 # ports must be excluded from pool.
62 cronet_restricted_ports = [
63     1, 7, 9, 11, 13, 15, 17, 19, 20, 21, 22, 23, 25, 37, 42, 43, 53, 77, 79, 87,
64     95, 101, 102, 103, 104, 109, 110, 111, 113, 115, 117, 119, 123, 135, 139,
65     143, 179, 389, 465, 512, 513, 514, 515, 526, 530, 531, 532, 540, 556, 563,
66     587, 601, 636, 993, 995, 2049, 3659, 4045, 6000, 6665, 6666, 6667, 6668,
67     6669, 6697
68 ]
69
70
71 def can_connect(port):
72     # this test is only really useful on unices where SO_REUSE_PORT is available
73     # so on Windows, where this test is expensive, skip it
74     if platform.system() == 'Windows': return False
75     s = socket.socket()
76     try:
77         s.connect(('localhost', port))
78         return True
79     except socket.error as e:
80         return False
81     finally:
82         s.close()
83
84
85 def can_bind(port, proto):
86     s = socket.socket(proto, socket.SOCK_STREAM)
87     s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
88     try:
89         s.bind(('localhost', port))
90         return True
91     except socket.error as e:
92         return False
93     finally:
94         s.close()
95
96
97 def refill_pool(max_timeout, req):
98     """Scan for ports not marked for being in use"""
99     chk = [
100         port for port in range(1025, 32766)
101         if port not in cronet_restricted_ports
102     ]
103     random.shuffle(chk)
104     for i in chk:
105         if len(pool) > 100: break
106         if i in in_use:
107             age = time.time() - in_use[i]
108             if age < max_timeout:
109                 continue
110             req.log_message("kill old request %d" % i)
111             del in_use[i]
112         if can_bind(i, socket.AF_INET) and can_bind(
113                 i, socket.AF_INET6) and not can_connect(i):
114             req.log_message("found available port %d" % i)
115             pool.append(i)
116
117
118 def allocate_port(req):
119     global pool
120     global in_use
121     global mu
122     mu.acquire()
123     max_timeout = 600
124     while not pool:
125         refill_pool(max_timeout, req)
126         if not pool:
127             req.log_message("failed to find ports: retrying soon")
128             mu.release()
129             time.sleep(1)
130             mu.acquire()
131             max_timeout /= 2
132     port = pool[0]
133     pool = pool[1:]
134     in_use[port] = time.time()
135     mu.release()
136     return port
137
138
139 keep_running = True
140
141
142 class Handler(BaseHTTPRequestHandler):
143
144     def setup(self):
145         # If the client is unreachable for 5 seconds, close the connection
146         self.timeout = 5
147         BaseHTTPRequestHandler.setup(self)
148
149     def do_GET(self):
150         global keep_running
151         global mu
152         if self.path == '/get':
153             # allocate a new port, it will stay bound for ten minutes and until
154             # it's unused
155             self.send_response(200)
156             self.send_header('Content-Type', 'text/plain')
157             self.end_headers()
158             p = allocate_port(self)
159             self.log_message('allocated port %d' % p)
160             self.wfile.write('%d' % p)
161         elif self.path[0:6] == '/drop/':
162             self.send_response(200)
163             self.send_header('Content-Type', 'text/plain')
164             self.end_headers()
165             p = int(self.path[6:])
166             mu.acquire()
167             if p in in_use:
168                 del in_use[p]
169                 pool.append(p)
170                 k = 'known'
171             else:
172                 k = 'unknown'
173             mu.release()
174             self.log_message('drop %s port %d' % (k, p))
175         elif self.path == '/version_number':
176             # fetch a version string and the current process pid
177             self.send_response(200)
178             self.send_header('Content-Type', 'text/plain')
179             self.end_headers()
180             self.wfile.write(_MY_VERSION)
181         elif self.path == '/dump':
182             # yaml module is not installed on Macs and Windows machines by default
183             # so we import it lazily (/dump action is only used for debugging)
184             import yaml
185             self.send_response(200)
186             self.send_header('Content-Type', 'text/plain')
187             self.end_headers()
188             mu.acquire()
189             now = time.time()
190             out = yaml.dump({
191                 'pool': pool,
192                 'in_use': dict((k, now - v) for k, v in in_use.items())
193             })
194             mu.release()
195             self.wfile.write(out)
196         elif self.path == '/quitquitquit':
197             self.send_response(200)
198             self.end_headers()
199             self.server.shutdown()
200
201
202 class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
203     """Handle requests in a separate thread"""
204
205
206 ThreadedHTTPServer(('', args.port), Handler).serve_forever()