Disable a debug option
[platform/upstream/curl.git] / tests / dictserver.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 #***************************************************************************
4 #                                  _   _ ____  _
5 #  Project                     ___| | | |  _ \| |
6 #                             / __| | | | |_) | |
7 #                            | (__| |_| |  _ <| |___
8 #                             \___|\___/|_| \_\_____|
9 #
10 # Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
11 #
12 # This software is licensed as described in the file COPYING, which
13 # you should have received as part of this distribution. The terms
14 # are also available at https://curl.se/docs/copyright.html.
15 #
16 # You may opt to use, copy, modify, merge, publish, distribute and/or sell
17 # copies of the Software, and permit persons to whom the Software is
18 # furnished to do so, under the terms of the COPYING file.
19 #
20 # This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
21 # KIND, either express or implied.
22 #
23 # SPDX-License-Identifier: curl
24 #
25 ###########################################################################
26 #
27 """ DICT server """
28
29 from __future__ import (absolute_import, division, print_function,
30                         unicode_literals)
31
32 import argparse
33 import logging
34 import os
35 import sys
36
37 from util import ClosingFileHandler
38
39 try:  # Python 2
40     import SocketServer as socketserver
41 except ImportError:  # Python 3
42     import socketserver
43
44 log = logging.getLogger(__name__)
45 HOST = "localhost"
46
47 # The strings that indicate the test framework is checking our aliveness
48 VERIFIED_REQ = b"verifiedserver"
49 VERIFIED_RSP = "WE ROOLZ: {pid}"
50
51
52 def dictserver(options):
53     """
54     Starts up a TCP server with a DICT handler and serves DICT requests
55     forever.
56     """
57     if options.pidfile:
58         pid = os.getpid()
59         # see tests/server/util.c function write_pidfile
60         if os.name == "nt":
61             pid += 65536
62         with open(options.pidfile, "w") as f:
63             f.write(str(pid))
64
65     local_bind = (options.host, options.port)
66     log.info("[DICT] Listening on %s", local_bind)
67
68     # Need to set the allow_reuse on the class, not on the instance.
69     socketserver.TCPServer.allow_reuse_address = True
70     server = socketserver.TCPServer(local_bind, DictHandler)
71     server.serve_forever()
72
73     return ScriptRC.SUCCESS
74
75
76 class DictHandler(socketserver.BaseRequestHandler):
77     """Handler class for DICT connections.
78
79     """
80     def handle(self):
81         """
82         Simple function which responds to all queries with a 552.
83         """
84         try:
85             # First, send a response to allow the server to continue.
86             rsp = "220 dictserver <xnooptions> <msgid@msgid>\n"
87             self.request.sendall(rsp.encode("utf-8"))
88
89             # Receive the request.
90             data = self.request.recv(1024).strip()
91             log.debug("[DICT] Incoming data: %r", data)
92
93             if VERIFIED_REQ in data:
94                 log.debug("[DICT] Received verification request from test "
95                           "framework")
96                 pid = os.getpid()
97                 # see tests/server/util.c function write_pidfile
98                 if os.name == "nt":
99                     pid += 65536
100                 response_data = VERIFIED_RSP.format(pid=pid)
101             else:
102                 log.debug("[DICT] Received normal request")
103                 response_data = "No matches"
104
105             # Send back a failure to find.
106             response = "552 {0}\n".format(response_data)
107             log.debug("[DICT] Responding with %r", response)
108             self.request.sendall(response.encode("utf-8"))
109
110         except IOError:
111             log.exception("[DICT] IOError hit during request")
112
113
114 def get_options():
115     parser = argparse.ArgumentParser()
116
117     parser.add_argument("--port", action="store", default=9016,
118                         type=int, help="port to listen on")
119     parser.add_argument("--host", action="store", default=HOST,
120                         help="host to listen on")
121     parser.add_argument("--verbose", action="store", type=int, default=0,
122                         help="verbose output")
123     parser.add_argument("--pidfile", action="store",
124                         help="file name for the PID")
125     parser.add_argument("--logfile", action="store",
126                         help="file name for the log")
127     parser.add_argument("--srcdir", action="store", help="test directory")
128     parser.add_argument("--id", action="store", help="server ID")
129     parser.add_argument("--ipv4", action="store_true", default=0,
130                         help="IPv4 flag")
131
132     return parser.parse_args()
133
134
135 def setup_logging(options):
136     """
137     Set up logging from the command line options
138     """
139     root_logger = logging.getLogger()
140     add_stdout = False
141
142     formatter = logging.Formatter("%(asctime)s %(levelname)-5.5s %(message)s")
143
144     # Write out to a logfile
145     if options.logfile:
146         handler = ClosingFileHandler(options.logfile)
147         handler.setFormatter(formatter)
148         handler.setLevel(logging.DEBUG)
149         root_logger.addHandler(handler)
150     else:
151         # The logfile wasn't specified. Add a stdout logger.
152         add_stdout = True
153
154     if options.verbose:
155         # Add a stdout logger as well in verbose mode
156         root_logger.setLevel(logging.DEBUG)
157         add_stdout = True
158     else:
159         root_logger.setLevel(logging.INFO)
160
161     if add_stdout:
162         stdout_handler = logging.StreamHandler(sys.stdout)
163         stdout_handler.setFormatter(formatter)
164         stdout_handler.setLevel(logging.DEBUG)
165         root_logger.addHandler(stdout_handler)
166
167
168 class ScriptRC(object):
169     """Enum for script return codes"""
170     SUCCESS = 0
171     FAILURE = 1
172     EXCEPTION = 2
173
174
175 class ScriptException(Exception):
176     pass
177
178
179 if __name__ == '__main__':
180     # Get the options from the user.
181     options = get_options()
182
183     # Setup logging using the user options
184     setup_logging(options)
185
186     # Run main script.
187     try:
188         rc = dictserver(options)
189     except Exception as e:
190         log.exception(e)
191         rc = ScriptRC.EXCEPTION
192
193     if options.pidfile and os.path.isfile(options.pidfile):
194         os.unlink(options.pidfile)
195
196     log.info("[DICT] Returning %d", rc)
197     sys.exit(rc)