Disable a debug option
[platform/upstream/curl.git] / tests / negtelnetserver.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 #
4 #  Project                     ___| | | |  _ \| |
5 #                             / __| | | | |_) | |
6 #                            | (__| |_| |  _ <| |___
7 #                             \___|\___/|_| \_\_____|
8 #
9 # Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
10 #
11 # This software is licensed as described in the file COPYING, which
12 # you should have received as part of this distribution. The terms
13 # are also available at https://curl.se/docs/copyright.html.
14 #
15 # You may opt to use, copy, modify, merge, publish, distribute and/or sell
16 # copies of the Software, and permit persons to whom the Software is
17 # furnished to do so, under the terms of the COPYING file.
18 #
19 # This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
20 # KIND, either express or implied.
21 #
22 # SPDX-License-Identifier: curl
23 #
24 """ A telnet server which negotiates"""
25
26 from __future__ import (absolute_import, division, print_function,
27                         unicode_literals)
28
29 import argparse
30 import logging
31 import os
32 import socket
33 import sys
34 import time
35
36 from util import ClosingFileHandler
37
38 if sys.version_info.major >= 3:
39     import socketserver
40 else:
41     import SocketServer as socketserver
42
43 log = logging.getLogger(__name__)
44 HOST = "localhost"
45 IDENT = "NTEL"
46
47
48 # The strings that indicate the test framework is checking our aliveness
49 VERIFIED_REQ = "verifiedserver"
50 VERIFIED_RSP = "WE ROOLZ: {pid}"
51
52
53 def telnetserver(options):
54     """
55     Starts up a TCP server with a telnet handler and serves DICT requests
56     forever.
57     """
58     if options.pidfile:
59         pid = os.getpid()
60         # see tests/server/util.c function write_pidfile
61         if os.name == "nt":
62             pid += 65536
63         with open(options.pidfile, "w") as f:
64             f.write(str(pid))
65
66     local_bind = (HOST, options.port)
67     log.info("Listening on %s", local_bind)
68
69     # Need to set the allow_reuse on the class, not on the instance.
70     socketserver.TCPServer.allow_reuse_address = True
71     with socketserver.TCPServer(local_bind, NegotiatingTelnetHandler) as server:
72         server.serve_forever()
73     # leaving `with` calls server.close() automatically
74     return ScriptRC.SUCCESS
75
76
77 class NegotiatingTelnetHandler(socketserver.BaseRequestHandler):
78     """Handler class for Telnet connections.
79
80     """
81     def handle(self):
82         """
83         Negotiates options before reading data.
84         """
85         neg = Negotiator(self.request)
86
87         try:
88             # Send some initial negotiations.
89             neg.send_do("NEW_ENVIRON")
90             neg.send_will("NEW_ENVIRON")
91             neg.send_dont("NAWS")
92             neg.send_wont("NAWS")
93
94             # Get the data passed through the negotiator
95             data = neg.recv(4*1024)
96             log.debug("Incoming data: %r", data)
97
98             if VERIFIED_REQ.encode('utf-8') in data:
99                 log.debug("Received verification request from test framework")
100                 pid = os.getpid()
101                 # see tests/server/util.c function write_pidfile
102                 if os.name == "nt":
103                     pid += 65536
104                 response = VERIFIED_RSP.format(pid=pid)
105                 response_data = response.encode('utf-8')
106             else:
107                 log.debug("Received normal request - echoing back")
108                 response_data = data.decode('utf-8').strip().encode('utf-8')
109
110             if response_data:
111                 log.debug("Sending %r", response_data)
112                 self.request.sendall(response_data)
113
114             # put some effort into making a clean socket shutdown
115             # that does not give the client ECONNRESET
116             self.request.settimeout(0.1)
117             self.request.recv(4*1024)
118             self.request.shutdown(socket.SHUT_RDWR)
119
120         except IOError:
121             log.exception("IOError hit during request")
122
123
124 class Negotiator(object):
125     NO_NEG = 0
126     START_NEG = 1
127     WILL = 2
128     WONT = 3
129     DO = 4
130     DONT = 5
131
132     def __init__(self, tcp):
133         self.tcp = tcp
134         self.state = self.NO_NEG
135
136     def recv(self, bytes):
137         """
138         Read bytes from TCP, handling negotiation sequences
139
140         :param bytes: Number of bytes to read
141         :return: a buffer of bytes
142         """
143         buffer = bytearray()
144
145         # If we keep receiving negotiation sequences, we won't fill the buffer.
146         # Keep looping while we can, and until we have something to give back
147         # to the caller.
148         while len(buffer) == 0:
149             data = self.tcp.recv(bytes)
150             if not data:
151                 # TCP failed to give us any data. Break out.
152                 break
153
154             for byte_int in bytearray(data):
155                 if self.state == self.NO_NEG:
156                     self.no_neg(byte_int, buffer)
157                 elif self.state == self.START_NEG:
158                     self.start_neg(byte_int)
159                 elif self.state in [self.WILL, self.WONT, self.DO, self.DONT]:
160                     self.handle_option(byte_int)
161                 else:
162                     # Received an unexpected byte. Stop negotiations
163                     log.error("Unexpected byte %s in state %s",
164                               byte_int,
165                               self.state)
166                     self.state = self.NO_NEG
167
168         return buffer
169
170     def no_neg(self, byte_int, buffer):
171         # Not negotiating anything thus far. Check to see if we
172         # should.
173         if byte_int == NegTokens.IAC:
174             # Start negotiation
175             log.debug("Starting negotiation (IAC)")
176             self.state = self.START_NEG
177         else:
178             # Just append the incoming byte to the buffer
179             buffer.append(byte_int)
180
181     def start_neg(self, byte_int):
182         # In a negotiation.
183         log.debug("In negotiation (%s)",
184                   NegTokens.from_val(byte_int))
185
186         if byte_int == NegTokens.WILL:
187             # Client is confirming they are willing to do an option
188             log.debug("Client is willing")
189             self.state = self.WILL
190         elif byte_int == NegTokens.WONT:
191             # Client is confirming they are unwilling to do an
192             # option
193             log.debug("Client is unwilling")
194             self.state = self.WONT
195         elif byte_int == NegTokens.DO:
196             # Client is indicating they can do an option
197             log.debug("Client can do")
198             self.state = self.DO
199         elif byte_int == NegTokens.DONT:
200             # Client is indicating they can't do an option
201             log.debug("Client can't do")
202             self.state = self.DONT
203         else:
204             # Received an unexpected byte. Stop negotiations
205             log.error("Unexpected byte %s in state %s",
206                       byte_int,
207                       self.state)
208             self.state = self.NO_NEG
209
210     def handle_option(self, byte_int):
211         if byte_int in [NegOptions.BINARY,
212                         NegOptions.CHARSET,
213                         NegOptions.SUPPRESS_GO_AHEAD,
214                         NegOptions.NAWS,
215                         NegOptions.NEW_ENVIRON]:
216             log.debug("Option: %s", NegOptions.from_val(byte_int))
217
218             # No further negotiation of this option needed. Reset the state.
219             self.state = self.NO_NEG
220
221         else:
222             # Received an unexpected byte. Stop negotiations
223             log.error("Unexpected byte %s in state %s",
224                       byte_int,
225                       self.state)
226             self.state = self.NO_NEG
227
228     def send_message(self, message_ints):
229         self.tcp.sendall(bytearray(message_ints))
230
231     def send_iac(self, arr):
232         message = [NegTokens.IAC]
233         message.extend(arr)
234         self.send_message(message)
235
236     def send_do(self, option_str):
237         log.debug("Sending DO %s", option_str)
238         self.send_iac([NegTokens.DO, NegOptions.to_val(option_str)])
239
240     def send_dont(self, option_str):
241         log.debug("Sending DONT %s", option_str)
242         self.send_iac([NegTokens.DONT, NegOptions.to_val(option_str)])
243
244     def send_will(self, option_str):
245         log.debug("Sending WILL %s", option_str)
246         self.send_iac([NegTokens.WILL, NegOptions.to_val(option_str)])
247
248     def send_wont(self, option_str):
249         log.debug("Sending WONT %s", option_str)
250         self.send_iac([NegTokens.WONT, NegOptions.to_val(option_str)])
251
252
253 class NegBase(object):
254     @classmethod
255     def to_val(cls, name):
256         return getattr(cls, name)
257
258     @classmethod
259     def from_val(cls, val):
260         for k in cls.__dict__.keys():
261             if getattr(cls, k) == val:
262                 return k
263
264         return "<unknown>"
265
266
267 class NegTokens(NegBase):
268     # The start of a negotiation sequence
269     IAC = 255
270     # Confirm willingness to negotiate
271     WILL = 251
272     # Confirm unwillingness to negotiate
273     WONT = 252
274     # Indicate willingness to negotiate
275     DO = 253
276     # Indicate unwillingness to negotiate
277     DONT = 254
278
279     # The start of sub-negotiation options.
280     SB = 250
281     # The end of sub-negotiation options.
282     SE = 240
283
284
285 class NegOptions(NegBase):
286     # Binary Transmission
287     BINARY = 0
288     # Suppress Go Ahead
289     SUPPRESS_GO_AHEAD = 3
290     # NAWS - width and height of client
291     NAWS = 31
292     # NEW-ENVIRON - environment variables on client
293     NEW_ENVIRON = 39
294     # Charset option
295     CHARSET = 42
296
297
298 def get_options():
299     parser = argparse.ArgumentParser()
300
301     parser.add_argument("--port", action="store", default=9019,
302                         type=int, help="port to listen on")
303     parser.add_argument("--verbose", action="store", type=int, default=0,
304                         help="verbose output")
305     parser.add_argument("--pidfile", action="store",
306                         help="file name for the PID")
307     parser.add_argument("--logfile", action="store",
308                         help="file name for the log")
309     parser.add_argument("--srcdir", action="store", help="test directory")
310     parser.add_argument("--id", action="store", help="server ID")
311     parser.add_argument("--ipv4", action="store_true", default=0,
312                         help="IPv4 flag")
313
314     return parser.parse_args()
315
316
317 def setup_logging(options):
318     """
319     Set up logging from the command line options
320     """
321     root_logger = logging.getLogger()
322     add_stdout = False
323
324     formatter = logging.Formatter("%(asctime)s %(levelname)-5.5s "
325                                   "[{ident}] %(message)s"
326                                   .format(ident=IDENT))
327
328     # Write out to a logfile
329     if options.logfile:
330         handler = ClosingFileHandler(options.logfile)
331         handler.setFormatter(formatter)
332         handler.setLevel(logging.DEBUG)
333         root_logger.addHandler(handler)
334     else:
335         # The logfile wasn't specified. Add a stdout logger.
336         add_stdout = True
337
338     if options.verbose:
339         # Add a stdout logger as well in verbose mode
340         root_logger.setLevel(logging.DEBUG)
341         add_stdout = True
342     else:
343         root_logger.setLevel(logging.INFO)
344
345     if add_stdout:
346         stdout_handler = logging.StreamHandler(sys.stdout)
347         stdout_handler.setFormatter(formatter)
348         stdout_handler.setLevel(logging.DEBUG)
349         root_logger.addHandler(stdout_handler)
350
351
352 class ScriptRC(object):
353     """Enum for script return codes"""
354     SUCCESS = 0
355     FAILURE = 1
356     EXCEPTION = 2
357
358
359 class ScriptException(Exception):
360     pass
361
362
363 if __name__ == '__main__':
364     # Get the options from the user.
365     options = get_options()
366
367     # Setup logging using the user options
368     setup_logging(options)
369
370     # Run main script.
371     try:
372         rc = telnetserver(options)
373     except Exception as e:
374         log.exception(e)
375         rc = ScriptRC.EXCEPTION
376
377     if options.pidfile and os.path.isfile(options.pidfile):
378         os.unlink(options.pidfile)
379
380     log.info("Returning %d", rc)
381     sys.exit(rc)