Imported Upstream version 7.59.0
[platform/upstream/curl.git] / tests / negtelnetserver.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 """ A telnet server which negotiates"""
5
6 from __future__ import (absolute_import, division, print_function,
7                         unicode_literals)
8 import argparse
9 import os
10 import sys
11 import logging
12 import struct
13 try:  # Python 2
14     import SocketServer as socketserver
15 except ImportError:  # Python 3
16     import socketserver
17
18
19 log = logging.getLogger(__name__)
20 HOST = "localhost"
21 IDENT = "NTEL"
22
23
24 # The strings that indicate the test framework is checking our aliveness
25 VERIFIED_REQ = b"verifiedserver"
26 VERIFIED_RSP = b"WE ROOLZ: {pid}"
27
28
29 def telnetserver(options):
30     """
31     Starts up a TCP server with a telnet handler and serves DICT requests
32     forever.
33     """
34     if options.pidfile:
35         pid = os.getpid()
36         with open(options.pidfile, "w") as f:
37             f.write(b"{0}".format(pid))
38
39     local_bind = (HOST, options.port)
40     log.info("Listening on %s", local_bind)
41
42     # Need to set the allow_reuse on the class, not on the instance.
43     socketserver.TCPServer.allow_reuse_address = True
44     server = socketserver.TCPServer(local_bind, NegotiatingTelnetHandler)
45     server.serve_forever()
46
47     return ScriptRC.SUCCESS
48
49
50 class NegotiatingTelnetHandler(socketserver.BaseRequestHandler):
51     """Handler class for Telnet connections.
52
53     """
54     def handle(self):
55         """
56         Negotiates options before reading data.
57         """
58         neg = Negotiator(self.request)
59
60         try:
61             # Send some initial negotiations.
62             neg.send_do("NEW_ENVIRON")
63             neg.send_will("NEW_ENVIRON")
64             neg.send_dont("NAWS")
65             neg.send_wont("NAWS")
66
67             # Get the data passed through the negotiator
68             data = neg.recv(1024)
69             log.debug("Incoming data: %r", data)
70
71             if VERIFIED_REQ in data:
72                 log.debug("Received verification request from test framework")
73                 response_data = VERIFIED_RSP.format(pid=os.getpid())
74             else:
75                 log.debug("Received normal request - echoing back")
76                 response_data = data.strip()
77
78             if response_data:
79                 log.debug("Sending %r", response_data)
80                 self.request.sendall(response_data)
81
82         except IOError:
83             log.exception("IOError hit during request")
84
85
86 class Negotiator(object):
87     NO_NEG = 0
88     START_NEG = 1
89     WILL = 2
90     WONT = 3
91     DO = 4
92     DONT = 5
93
94     def __init__(self, tcp):
95         self.tcp = tcp
96         self.state = self.NO_NEG
97
98     def recv(self, bytes):
99         """
100         Read bytes from TCP, handling negotiation sequences
101
102         :param bytes: Number of bytes to read
103         :return: a buffer of bytes
104         """
105         buffer = bytearray()
106
107         # If we keep receiving negotiation sequences, we won't fill the buffer.
108         # Keep looping while we can, and until we have something to give back
109         # to the caller.
110         while len(buffer) == 0:
111             data = self.tcp.recv(bytes)
112             if not data:
113                 # TCP failed to give us any data. Break out.
114                 break
115
116             for byte in data:
117                 byte_int = self.byte_to_int(byte)
118
119                 if self.state == self.NO_NEG:
120                     self.no_neg(byte, byte_int, buffer)
121                 elif self.state == self.START_NEG:
122                     self.start_neg(byte_int)
123                 elif self.state in [self.WILL, self.WONT, self.DO, self.DONT]:
124                     self.handle_option(byte_int)
125                 else:
126                     # Received an unexpected byte. Stop negotiations
127                     log.error("Unexpected byte %s in state %s",
128                               byte_int,
129                               self.state)
130                     self.state = self.NO_NEG
131
132         return buffer
133
134     def byte_to_int(self, byte):
135         return struct.unpack(b'B', byte)[0]
136
137     def no_neg(self, byte, byte_int, buffer):
138         # Not negotiating anything thus far. Check to see if we
139         # should.
140         if byte_int == NegTokens.IAC:
141             # Start negotiation
142             log.debug("Starting negotiation (IAC)")
143             self.state = self.START_NEG
144         else:
145             # Just append the incoming byte to the buffer
146             buffer.append(byte)
147
148     def start_neg(self, byte_int):
149         # In a negotiation.
150         log.debug("In negotiation (%s)",
151                   NegTokens.from_val(byte_int))
152
153         if byte_int == NegTokens.WILL:
154             # Client is confirming they are willing to do an option
155             log.debug("Client is willing")
156             self.state = self.WILL
157         elif byte_int == NegTokens.WONT:
158             # Client is confirming they are unwilling to do an
159             # option
160             log.debug("Client is unwilling")
161             self.state = self.WONT
162         elif byte_int == NegTokens.DO:
163             # Client is indicating they can do an option
164             log.debug("Client can do")
165             self.state = self.DO
166         elif byte_int == NegTokens.DONT:
167             # Client is indicating they can't do an option
168             log.debug("Client can't do")
169             self.state = self.DONT
170         else:
171             # Received an unexpected byte. Stop negotiations
172             log.error("Unexpected byte %s in state %s",
173                       byte_int,
174                       self.state)
175             self.state = self.NO_NEG
176
177     def handle_option(self, byte_int):
178         if byte_int in [NegOptions.BINARY,
179                         NegOptions.CHARSET,
180                         NegOptions.SUPPRESS_GO_AHEAD,
181                         NegOptions.NAWS,
182                         NegOptions.NEW_ENVIRON]:
183             log.debug("Option: %s", NegOptions.from_val(byte_int))
184
185             # No further negotiation of this option needed. Reset the state.
186             self.state = self.NO_NEG
187
188         else:
189             # Received an unexpected byte. Stop negotiations
190             log.error("Unexpected byte %s in state %s",
191                       byte_int,
192                       self.state)
193             self.state = self.NO_NEG
194
195     def send_message(self, message):
196         packed_message = self.pack(message)
197         self.tcp.sendall(packed_message)
198
199     def pack(self, arr):
200         return struct.pack(b'{0}B'.format(len(arr)), *arr)
201
202     def send_iac(self, arr):
203         message = [NegTokens.IAC]
204         message.extend(arr)
205         self.send_message(message)
206
207     def send_do(self, option_str):
208         log.debug("Sending DO %s", option_str)
209         self.send_iac([NegTokens.DO, NegOptions.to_val(option_str)])
210
211     def send_dont(self, option_str):
212         log.debug("Sending DONT %s", option_str)
213         self.send_iac([NegTokens.DONT, NegOptions.to_val(option_str)])
214
215     def send_will(self, option_str):
216         log.debug("Sending WILL %s", option_str)
217         self.send_iac([NegTokens.WILL, NegOptions.to_val(option_str)])
218
219     def send_wont(self, option_str):
220         log.debug("Sending WONT %s", option_str)
221         self.send_iac([NegTokens.WONT, NegOptions.to_val(option_str)])
222
223
224 class NegBase(object):
225     @classmethod
226     def to_val(cls, name):
227         return getattr(cls, name)
228
229     @classmethod
230     def from_val(cls, val):
231         for k in cls.__dict__.keys():
232             if getattr(cls, k) == val:
233                 return k
234
235         return "<unknown>"
236
237
238 class NegTokens(NegBase):
239     # The start of a negotiation sequence
240     IAC = 255
241     # Confirm willingness to negotiate
242     WILL = 251
243     # Confirm unwillingness to negotiate
244     WONT = 252
245     # Indicate willingness to negotiate
246     DO = 253
247     # Indicate unwillingness to negotiate
248     DONT = 254
249
250     # The start of sub-negotiation options.
251     SB = 250
252     # The end of sub-negotiation options.
253     SE = 240
254
255
256 class NegOptions(NegBase):
257     # Binary Transmission
258     BINARY = 0
259     # Suppress Go Ahead
260     SUPPRESS_GO_AHEAD = 3
261     # NAWS - width and height of client
262     NAWS = 31
263     # NEW-ENVIRON - environment variables on client
264     NEW_ENVIRON = 39
265     # Charset option
266     CHARSET = 42
267
268
269 def get_options():
270     parser = argparse.ArgumentParser()
271
272     parser.add_argument("--port", action="store", default=9019,
273                         type=int, help="port to listen on")
274     parser.add_argument("--verbose", action="store", type=int, default=0,
275                         help="verbose output")
276     parser.add_argument("--pidfile", action="store",
277                         help="file name for the PID")
278     parser.add_argument("--logfile", action="store",
279                         help="file name for the log")
280     parser.add_argument("--srcdir", action="store", help="test directory")
281     parser.add_argument("--id", action="store", help="server ID")
282     parser.add_argument("--ipv4", action="store_true", default=0,
283                         help="IPv4 flag")
284
285     return parser.parse_args()
286
287
288 def setup_logging(options):
289     """
290     Set up logging from the command line options
291     """
292     root_logger = logging.getLogger()
293     add_stdout = False
294
295     formatter = logging.Formatter("%(asctime)s %(levelname)-5.5s "
296                                   "[{ident}] %(message)s"
297                                   .format(ident=IDENT))
298
299     # Write out to a logfile
300     if options.logfile:
301         handler = logging.FileHandler(options.logfile, mode="w")
302         handler.setFormatter(formatter)
303         handler.setLevel(logging.DEBUG)
304         root_logger.addHandler(handler)
305     else:
306         # The logfile wasn't specified. Add a stdout logger.
307         add_stdout = True
308
309     if options.verbose:
310         # Add a stdout logger as well in verbose mode
311         root_logger.setLevel(logging.DEBUG)
312         add_stdout = True
313     else:
314         root_logger.setLevel(logging.INFO)
315
316     if add_stdout:
317         stdout_handler = logging.StreamHandler(sys.stdout)
318         stdout_handler.setFormatter(formatter)
319         stdout_handler.setLevel(logging.DEBUG)
320         root_logger.addHandler(stdout_handler)
321
322
323 class ScriptRC(object):
324     """Enum for script return codes"""
325     SUCCESS = 0
326     FAILURE = 1
327     EXCEPTION = 2
328
329
330 class ScriptException(Exception):
331     pass
332
333
334 if __name__ == '__main__':
335     # Get the options from the user.
336     options = get_options()
337
338     # Setup logging using the user options
339     setup_logging(options)
340
341     # Run main script.
342     try:
343         rc = telnetserver(options)
344     except Exception as e:
345         log.exception(e)
346         rc = ScriptRC.EXCEPTION
347
348     log.info("Returning %d", rc)
349     sys.exit(rc)