2 # -*- coding: utf-8 -*-
4 # Project ___| | | | _ \| |
6 # | (__| |_| | _ <| |___
7 # \___|\___/|_| \_\_____|
9 # Copyright (C) 2017, Daniel Stenberg, <daniel@haxx.se>, et al.
11 # This software is licensed as described in the file COPYING, which
12 # you should have received as part of this distribution. The terms
13 # are also available at https://curl.haxx.se/docs/copyright.html.
15 # You may opt to use, copy, modify, merge, publish, distribute and/or sell
16 # copies of the Software, and permit persons to whom the Software is
17 # furnished to do so, under the terms of the COPYING file.
19 # This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
20 # KIND, either express or implied.
22 """Server for testing SMB"""
24 from __future__ import (absolute_import, division, print_function)
33 # Import our curl test data helper
36 # This saves us having to set up the PYTHONPATH explicitly
37 deps_dir = os.path.join(os.path.dirname(__file__), "python_dependencies")
38 sys.path.append(deps_dir)
39 from impacket import smbserver as imp_smbserver
40 from impacket import smb as imp_smb
41 from impacket.nt_errors import (STATUS_ACCESS_DENIED, STATUS_SUCCESS,
44 log = logging.getLogger(__name__)
45 SERVER_MAGIC = "SERVER_MAGIC"
46 TESTS_MAGIC = "TESTS_MAGIC"
47 VERIFIED_REQ = "verifiedserver"
48 VERIFIED_RSP = b"WE ROOLZ: {pid}\n"
51 def smbserver(options):
52 """Start up a TCP SMB server that serves forever
57 with open(options.pidfile, "w") as f:
58 f.write("{0}".format(pid))
60 # Here we write a mini config for the server
61 smb_config = ConfigParser.ConfigParser()
62 smb_config.add_section("global")
63 smb_config.set("global", "server_name", "SERVICE")
64 smb_config.set("global", "server_os", "UNIX")
65 smb_config.set("global", "server_domain", "WORKGROUP")
66 smb_config.set("global", "log_file", "")
67 smb_config.set("global", "credentials_file", "")
69 # We need a share which allows us to test that the server is running
70 smb_config.add_section("SERVER")
71 smb_config.set("SERVER", "comment", "server function")
72 smb_config.set("SERVER", "read only", "yes")
73 smb_config.set("SERVER", "share type", "0")
74 smb_config.set("SERVER", "path", SERVER_MAGIC)
76 # Have a share for tests. These files will be autogenerated from the
78 smb_config.add_section("TESTS")
79 smb_config.set("TESTS", "comment", "tests")
80 smb_config.set("TESTS", "read only", "yes")
81 smb_config.set("TESTS", "share type", "0")
82 smb_config.set("TESTS", "path", TESTS_MAGIC)
84 if not options.srcdir or not os.path.isdir(options.srcdir):
85 raise ScriptException("--srcdir is mandatory")
87 test_data_dir = os.path.join(options.srcdir, "data")
89 smb_server = TestSmbServer(("127.0.0.1", options.port),
90 config_parser=smb_config,
91 test_data_directory=test_data_dir)
92 log.info("[SMB] setting up SMB server on port %s", options.port)
93 smb_server.processConfigFile()
94 smb_server.serve_forever()
98 class TestSmbServer(imp_smbserver.SMBSERVER):
100 Test server for SMB which subclasses the impacket SMBSERVER and provides
107 test_data_directory=None):
108 imp_smbserver.SMBSERVER.__init__(self,
110 config_parser=config_parser)
112 # Set up a test data object so we can get test data later.
113 self.ctd = curl_test_data.TestData(test_data_directory)
115 # Override smbComNtCreateAndX so we can pretend to have files which
117 self.hookSmbCommand(imp_smb.SMB.SMB_COM_NT_CREATE_ANDX,
120 def create_and_x(self, conn_id, smb_server, smb_command, recv_packet):
122 Our version of smbComNtCreateAndX looks for special test files and
123 fools the rest of the framework into opening them as if they were
126 conn_data = smb_server.getConnectionData(conn_id)
128 # Wrap processing in a try block which allows us to throw SmbException
129 # to control the flow.
131 ncax_parms = imp_smb.SMBNtCreateAndX_Parameters(
132 smb_command["Parameters"])
134 path = self.get_share_path(conn_data,
135 ncax_parms["RootFid"],
137 log.info("[SMB] Requested share path: %s", path)
139 disposition = ncax_parms["Disposition"]
140 log.debug("[SMB] Requested disposition: %s", disposition)
142 # Currently we only support reading files.
143 if disposition != imp_smb.FILE_OPEN:
144 raise SmbException(STATUS_ACCESS_DENIED,
145 "Only support reading files")
147 # Check to see if the path we were given is actually a
148 # magic path which needs generating on the fly.
149 if path not in [SERVER_MAGIC, TESTS_MAGIC]:
150 # Pass the command onto the original handler.
151 return imp_smbserver.SMBCommands.smbComNtCreateAndX(conn_id,
156 flags2 = recv_packet["Flags2"]
157 ncax_data = imp_smb.SMBNtCreateAndX_Data(flags=flags2,
160 requested_file = imp_smbserver.decodeSMBString(
162 ncax_data["FileName"])
163 log.debug("[SMB] User requested file '%s'", requested_file)
165 if path == SERVER_MAGIC:
166 fid, full_path = self.get_server_path(requested_file)
168 assert (path == TESTS_MAGIC)
169 fid, full_path = self.get_test_path(requested_file)
171 resp_parms = imp_smb.SMBNtCreateAndXResponse_Parameters()
174 # Simple way to generate a fid
175 if len(conn_data["OpenedFiles"]) == 0:
178 fakefid = conn_data["OpenedFiles"].keys()[-1] + 1
179 resp_parms["Fid"] = fakefid
180 resp_parms["CreateAction"] = disposition
182 if os.path.isdir(path):
184 "FileAttributes"] = imp_smb.SMB_FILE_ATTRIBUTE_DIRECTORY
185 resp_parms["IsDirectory"] = 1
187 resp_parms["IsDirectory"] = 0
188 resp_parms["FileAttributes"] = ncax_parms["FileAttributes"]
190 # Get this file's information
191 resp_info, error_code = imp_smbserver.queryPathInformation(
192 "", full_path, level=imp_smb.SMB_QUERY_FILE_ALL_INFO)
194 if error_code != STATUS_SUCCESS:
195 raise SmbException(error_code, "Failed to query path info")
197 resp_parms["CreateTime"] = resp_info["CreationTime"]
198 resp_parms["LastAccessTime"] = resp_info[
200 resp_parms["LastWriteTime"] = resp_info["LastWriteTime"]
201 resp_parms["LastChangeTime"] = resp_info[
203 resp_parms["FileAttributes"] = resp_info[
205 resp_parms["AllocationSize"] = resp_info[
207 resp_parms["EndOfFile"] = resp_info["EndOfFile"]
209 # Let's store the fid for the connection
210 # smbServer.log("Create file %s, mode:0x%x" % (pathName, mode))
211 conn_data["OpenedFiles"][fakefid] = {}
212 conn_data["OpenedFiles"][fakefid]["FileHandle"] = fid
213 conn_data["OpenedFiles"][fakefid]["FileName"] = path
214 conn_data["OpenedFiles"][fakefid]["DeleteOnClose"] = False
216 except SmbException as s:
217 log.debug("[SMB] SmbException hit: %s", s)
218 error_code = s.error_code
222 resp_cmd = imp_smb.SMBCommand(imp_smb.SMB.SMB_COM_NT_CREATE_ANDX)
223 resp_cmd["Parameters"] = resp_parms
224 resp_cmd["Data"] = resp_data
225 smb_server.setConnectionData(conn_id, conn_data)
227 return [resp_cmd], None, error_code
229 def get_share_path(self, conn_data, root_fid, tid):
230 conn_shares = conn_data["ConnectedShares"]
232 if tid in conn_shares:
234 # If we have a rootFid, the path is relative to that fid
235 path = conn_data["OpenedFiles"][root_fid]["FileName"]
236 log.debug("RootFid present %s!" % path)
238 if "path" in conn_shares[tid]:
239 path = conn_shares[tid]["path"]
241 raise SmbException(STATUS_ACCESS_DENIED,
242 "Connection share had no path")
244 raise SmbException(imp_smbserver.STATUS_SMB_BAD_TID,
249 def get_server_path(self, requested_filename):
250 log.debug("[SMB] Get server path '%s'", requested_filename)
252 if requested_filename not in [VERIFIED_REQ]:
253 raise SmbException(STATUS_NO_SUCH_FILE, "Couldn't find the file")
255 fid, filename = tempfile.mkstemp()
256 log.debug("[SMB] Created %s (%d) for storing '%s'",
257 filename, fid, requested_filename)
261 if requested_filename == VERIFIED_REQ:
262 log.debug("[SMB] Verifying server is alive")
263 contents = VERIFIED_RSP.format(pid=os.getpid())
265 self.write_to_fid(fid, contents)
268 def write_to_fid(self, fid, contents):
269 # Write the contents to file descriptor
270 os.write(fid, contents)
273 # Rewind the file to the beginning so a read gets us the contents
274 os.lseek(fid, 0, os.SEEK_SET)
276 def get_test_path(self, requested_filename):
277 log.info("[SMB] Get reply data from 'test%s'", requested_filename)
279 fid, filename = tempfile.mkstemp()
280 log.debug("[SMB] Created %s (%d) for storing test '%s'",
281 filename, fid, requested_filename)
284 contents = self.ctd.get_test_data(requested_filename)
285 self.write_to_fid(fid, contents)
289 log.exception("Failed to make test file")
290 raise SmbException(STATUS_NO_SUCH_FILE, "Failed to make test file")
293 class SmbException(Exception):
294 def __init__(self, error_code, error_message):
295 super(SmbException, self).__init__(error_message)
296 self.error_code = error_code
299 class ScriptRC(object):
300 """Enum for script return codes"""
306 class ScriptException(Exception):
311 parser = argparse.ArgumentParser()
313 parser.add_argument("--port", action="store", default=9017,
314 type=int, help="port to listen on")
315 parser.add_argument("--verbose", action="store", type=int, default=0,
316 help="verbose output")
317 parser.add_argument("--pidfile", action="store",
318 help="file name for the PID")
319 parser.add_argument("--logfile", action="store",
320 help="file name for the log")
321 parser.add_argument("--srcdir", action="store", help="test directory")
322 parser.add_argument("--id", action="store", help="server ID")
323 parser.add_argument("--ipv4", action="store_true", default=0,
326 return parser.parse_args()
329 def setup_logging(options):
331 Set up logging from the command line options
333 root_logger = logging.getLogger()
336 formatter = logging.Formatter("%(asctime)s %(levelname)-5.5s %(message)s")
338 # Write out to a logfile
340 handler = logging.FileHandler(options.logfile, mode="w")
341 handler.setFormatter(formatter)
342 handler.setLevel(logging.DEBUG)
343 root_logger.addHandler(handler)
345 # The logfile wasn't specified. Add a stdout logger.
349 # Add a stdout logger as well in verbose mode
350 root_logger.setLevel(logging.DEBUG)
353 root_logger.setLevel(logging.INFO)
356 stdout_handler = logging.StreamHandler(sys.stdout)
357 stdout_handler.setFormatter(formatter)
358 stdout_handler.setLevel(logging.DEBUG)
359 root_logger.addHandler(stdout_handler)
362 if __name__ == '__main__':
363 # Get the options from the user.
364 options = get_options()
366 # Setup logging using the user options
367 setup_logging(options)
371 rc = smbserver(options)
372 except Exception as e:
374 rc = ScriptRC.EXCEPTION
376 log.info("[SMB] Returning %d", rc)