Files correlati : Commento : Aggiornata cUrl alla ultima versione su Github: 7.56.1 git-svn-id: svn://10.65.10.50/branches/R_10_00@24203 c028cbd2-c16b-5b4b-a496-9718f37d4682
		
			
				
	
	
		
			378 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			378 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python
 | |
| # -*- coding: utf-8 -*-
 | |
| #
 | |
| #  Project                     ___| | | |  _ \| |
 | |
| #                             / __| | | | |_) | |
 | |
| #                            | (__| |_| |  _ <| |___
 | |
| #                             \___|\___/|_| \_\_____|
 | |
| #
 | |
| # Copyright (C) 2017, Daniel Stenberg, <daniel@haxx.se>, et al.
 | |
| #
 | |
| # This software is licensed as described in the file COPYING, which
 | |
| # you should have received as part of this distribution. The terms
 | |
| # are also available at https://curl.haxx.se/docs/copyright.html.
 | |
| #
 | |
| # You may opt to use, copy, modify, merge, publish, distribute and/or sell
 | |
| # copies of the Software, and permit persons to whom the Software is
 | |
| # furnished to do so, under the terms of the COPYING file.
 | |
| #
 | |
| # This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
 | |
| # KIND, either express or implied.
 | |
| #
 | |
| """Server for testing SMB"""
 | |
| 
 | |
| from __future__ import (absolute_import, division, print_function)
 | |
| # unicode_literals)
 | |
| import argparse
 | |
| import ConfigParser
 | |
| import os
 | |
| import sys
 | |
| import logging
 | |
| import tempfile
 | |
| 
 | |
| # Import our curl test data helper
 | |
| import curl_test_data
 | |
| 
 | |
| # This saves us having to set up the PYTHONPATH explicitly
 | |
| deps_dir = os.path.join(os.path.dirname(__file__), "python_dependencies")
 | |
| sys.path.append(deps_dir)
 | |
| from impacket import smbserver as imp_smbserver
 | |
| from impacket import smb as imp_smb
 | |
| from impacket.nt_errors import (STATUS_ACCESS_DENIED, STATUS_SUCCESS,
 | |
|                                 STATUS_NO_SUCH_FILE)
 | |
| 
 | |
| log = logging.getLogger(__name__)
 | |
| SERVER_MAGIC = "SERVER_MAGIC"
 | |
| TESTS_MAGIC = "TESTS_MAGIC"
 | |
| VERIFIED_REQ = "verifiedserver"
 | |
| VERIFIED_RSP = b"WE ROOLZ: {pid}\n"
 | |
| 
 | |
| 
 | |
| def smbserver(options):
 | |
|     """Start up a TCP SMB server that serves forever
 | |
| 
 | |
|     """
 | |
|     if options.pidfile:
 | |
|         pid = os.getpid()
 | |
|         with open(options.pidfile, "w") as f:
 | |
|             f.write("{0}".format(pid))
 | |
| 
 | |
|     # Here we write a mini config for the server
 | |
|     smb_config = ConfigParser.ConfigParser()
 | |
|     smb_config.add_section("global")
 | |
|     smb_config.set("global", "server_name", "SERVICE")
 | |
|     smb_config.set("global", "server_os", "UNIX")
 | |
|     smb_config.set("global", "server_domain", "WORKGROUP")
 | |
|     smb_config.set("global", "log_file", "")
 | |
|     smb_config.set("global", "credentials_file", "")
 | |
| 
 | |
|     # We need a share which allows us to test that the server is running
 | |
|     smb_config.add_section("SERVER")
 | |
|     smb_config.set("SERVER", "comment", "server function")
 | |
|     smb_config.set("SERVER", "read only", "yes")
 | |
|     smb_config.set("SERVER", "share type", "0")
 | |
|     smb_config.set("SERVER", "path", SERVER_MAGIC)
 | |
| 
 | |
|     # Have a share for tests.  These files will be autogenerated from the
 | |
|     # test input.
 | |
|     smb_config.add_section("TESTS")
 | |
|     smb_config.set("TESTS", "comment", "tests")
 | |
|     smb_config.set("TESTS", "read only", "yes")
 | |
|     smb_config.set("TESTS", "share type", "0")
 | |
|     smb_config.set("TESTS", "path", TESTS_MAGIC)
 | |
| 
 | |
|     if not options.srcdir or not os.path.isdir(options.srcdir):
 | |
|         raise ScriptException("--srcdir is mandatory")
 | |
| 
 | |
|     test_data_dir = os.path.join(options.srcdir, "data")
 | |
| 
 | |
|     smb_server = TestSmbServer(("127.0.0.1", options.port),
 | |
|                                config_parser=smb_config,
 | |
|                                test_data_directory=test_data_dir)
 | |
|     log.info("[SMB] setting up SMB server on port %s", options.port)
 | |
|     smb_server.processConfigFile()
 | |
|     smb_server.serve_forever()
 | |
|     return 0
 | |
| 
 | |
| 
 | |
| class TestSmbServer(imp_smbserver.SMBSERVER):
 | |
|     """
 | |
|     Test server for SMB which subclasses the impacket SMBSERVER and provides
 | |
|     test functionality.
 | |
|     """
 | |
| 
 | |
|     def __init__(self,
 | |
|                  address,
 | |
|                  config_parser=None,
 | |
|                  test_data_directory=None):
 | |
|         imp_smbserver.SMBSERVER.__init__(self,
 | |
|                                          address,
 | |
|                                          config_parser=config_parser)
 | |
| 
 | |
|         # Set up a test data object so we can get test data later.
 | |
|         self.ctd = curl_test_data.TestData(test_data_directory)
 | |
| 
 | |
|         # Override smbComNtCreateAndX so we can pretend to have files which
 | |
|         # don't exist.
 | |
|         self.hookSmbCommand(imp_smb.SMB.SMB_COM_NT_CREATE_ANDX,
 | |
|                             self.create_and_x)
 | |
| 
 | |
|     def create_and_x(self, conn_id, smb_server, smb_command, recv_packet):
 | |
|         """
 | |
|         Our version of smbComNtCreateAndX looks for special test files and
 | |
|         fools the rest of the framework into opening them as if they were
 | |
|         normal files.
 | |
|         """
 | |
|         conn_data = smb_server.getConnectionData(conn_id)
 | |
| 
 | |
|         # Wrap processing in a try block which allows us to throw SmbException
 | |
|         # to control the flow.
 | |
|         try:
 | |
|             ncax_parms = imp_smb.SMBNtCreateAndX_Parameters(
 | |
|                 smb_command["Parameters"])
 | |
| 
 | |
|             path = self.get_share_path(conn_data,
 | |
|                                        ncax_parms["RootFid"],
 | |
|                                        recv_packet["Tid"])
 | |
|             log.info("[SMB] Requested share path: %s", path)
 | |
| 
 | |
|             disposition = ncax_parms["Disposition"]
 | |
|             log.debug("[SMB] Requested disposition: %s", disposition)
 | |
| 
 | |
|             # Currently we only support reading files.
 | |
|             if disposition != imp_smb.FILE_OPEN:
 | |
|                 raise SmbException(STATUS_ACCESS_DENIED,
 | |
|                                    "Only support reading files")
 | |
| 
 | |
|             # Check to see if the path we were given is actually a
 | |
|             # magic path which needs generating on the fly.
 | |
|             if path not in [SERVER_MAGIC, TESTS_MAGIC]:
 | |
|                 # Pass the command onto the original handler.
 | |
|                 return imp_smbserver.SMBCommands.smbComNtCreateAndX(conn_id,
 | |
|                                                                     smb_server,
 | |
|                                                                     smb_command,
 | |
|                                                                     recv_packet)
 | |
| 
 | |
|             flags2 = recv_packet["Flags2"]
 | |
|             ncax_data = imp_smb.SMBNtCreateAndX_Data(flags=flags2,
 | |
|                                                      data=smb_command[
 | |
|                                                          "Data"])
 | |
|             requested_file = imp_smbserver.decodeSMBString(
 | |
|                 flags2,
 | |
|                 ncax_data["FileName"])
 | |
|             log.debug("[SMB] User requested file '%s'", requested_file)
 | |
| 
 | |
|             if path == SERVER_MAGIC:
 | |
|                 fid, full_path = self.get_server_path(requested_file)
 | |
|             else:
 | |
|                 assert (path == TESTS_MAGIC)
 | |
|                 fid, full_path = self.get_test_path(requested_file)
 | |
| 
 | |
|             resp_parms = imp_smb.SMBNtCreateAndXResponse_Parameters()
 | |
|             resp_data = ""
 | |
| 
 | |
|             # Simple way to generate a fid
 | |
|             if len(conn_data["OpenedFiles"]) == 0:
 | |
|                 fakefid = 1
 | |
|             else:
 | |
|                 fakefid = conn_data["OpenedFiles"].keys()[-1] + 1
 | |
|             resp_parms["Fid"] = fakefid
 | |
|             resp_parms["CreateAction"] = disposition
 | |
| 
 | |
|             if os.path.isdir(path):
 | |
|                 resp_parms[
 | |
|                     "FileAttributes"] = imp_smb.SMB_FILE_ATTRIBUTE_DIRECTORY
 | |
|                 resp_parms["IsDirectory"] = 1
 | |
|             else:
 | |
|                 resp_parms["IsDirectory"] = 0
 | |
|                 resp_parms["FileAttributes"] = ncax_parms["FileAttributes"]
 | |
| 
 | |
|             # Get this file's information
 | |
|             resp_info, error_code = imp_smbserver.queryPathInformation(
 | |
|                 "", full_path, level=imp_smb.SMB_QUERY_FILE_ALL_INFO)
 | |
| 
 | |
|             if error_code != STATUS_SUCCESS:
 | |
|                 raise SmbException(error_code, "Failed to query path info")
 | |
| 
 | |
|             resp_parms["CreateTime"] = resp_info["CreationTime"]
 | |
|             resp_parms["LastAccessTime"] = resp_info[
 | |
|                 "LastAccessTime"]
 | |
|             resp_parms["LastWriteTime"] = resp_info["LastWriteTime"]
 | |
|             resp_parms["LastChangeTime"] = resp_info[
 | |
|                 "LastChangeTime"]
 | |
|             resp_parms["FileAttributes"] = resp_info[
 | |
|                 "ExtFileAttributes"]
 | |
|             resp_parms["AllocationSize"] = resp_info[
 | |
|                 "AllocationSize"]
 | |
|             resp_parms["EndOfFile"] = resp_info["EndOfFile"]
 | |
| 
 | |
|             # Let's store the fid for the connection
 | |
|             # smbServer.log("Create file %s, mode:0x%x" % (pathName, mode))
 | |
|             conn_data["OpenedFiles"][fakefid] = {}
 | |
|             conn_data["OpenedFiles"][fakefid]["FileHandle"] = fid
 | |
|             conn_data["OpenedFiles"][fakefid]["FileName"] = path
 | |
|             conn_data["OpenedFiles"][fakefid]["DeleteOnClose"] = False
 | |
| 
 | |
|         except SmbException as s:
 | |
|             log.debug("[SMB] SmbException hit: %s", s)
 | |
|             error_code = s.error_code
 | |
|             resp_parms = ""
 | |
|             resp_data = ""
 | |
| 
 | |
|         resp_cmd = imp_smb.SMBCommand(imp_smb.SMB.SMB_COM_NT_CREATE_ANDX)
 | |
|         resp_cmd["Parameters"] = resp_parms
 | |
|         resp_cmd["Data"] = resp_data
 | |
|         smb_server.setConnectionData(conn_id, conn_data)
 | |
| 
 | |
|         return [resp_cmd], None, error_code
 | |
| 
 | |
|     def get_share_path(self, conn_data, root_fid, tid):
 | |
|         conn_shares = conn_data["ConnectedShares"]
 | |
| 
 | |
|         if tid in conn_shares:
 | |
|             if root_fid > 0:
 | |
|                 # If we have a rootFid, the path is relative to that fid
 | |
|                 path = conn_data["OpenedFiles"][root_fid]["FileName"]
 | |
|                 log.debug("RootFid present %s!" % path)
 | |
|             else:
 | |
|                 if "path" in conn_shares[tid]:
 | |
|                     path = conn_shares[tid]["path"]
 | |
|                 else:
 | |
|                     raise SmbException(STATUS_ACCESS_DENIED,
 | |
|                                        "Connection share had no path")
 | |
|         else:
 | |
|             raise SmbException(imp_smbserver.STATUS_SMB_BAD_TID,
 | |
|                                "TID was invalid")
 | |
| 
 | |
|         return path
 | |
| 
 | |
|     def get_server_path(self, requested_filename):
 | |
|         log.debug("[SMB] Get server path '%s'", requested_filename)
 | |
| 
 | |
|         if requested_filename not in [VERIFIED_REQ]:
 | |
|             raise SmbException(STATUS_NO_SUCH_FILE, "Couldn't find the file")
 | |
| 
 | |
|         fid, filename = tempfile.mkstemp()
 | |
|         log.debug("[SMB] Created %s (%d) for storing '%s'",
 | |
|                   filename, fid, requested_filename)
 | |
| 
 | |
|         contents = ""
 | |
| 
 | |
|         if requested_filename == VERIFIED_REQ:
 | |
|             log.debug("[SMB] Verifying server is alive")
 | |
|             contents = VERIFIED_RSP.format(pid=os.getpid())
 | |
| 
 | |
|         self.write_to_fid(fid, contents)
 | |
|         return fid, filename
 | |
| 
 | |
|     def write_to_fid(self, fid, contents):
 | |
|         # Write the contents to file descriptor
 | |
|         os.write(fid, contents)
 | |
|         os.fsync(fid)
 | |
| 
 | |
|         # Rewind the file to the beginning so a read gets us the contents
 | |
|         os.lseek(fid, 0, os.SEEK_SET)
 | |
| 
 | |
|     def get_test_path(self, requested_filename):
 | |
|         log.info("[SMB] Get reply data from 'test%s'", requested_filename)
 | |
| 
 | |
|         fid, filename = tempfile.mkstemp()
 | |
|         log.debug("[SMB] Created %s (%d) for storing test '%s'",
 | |
|                   filename, fid, requested_filename)
 | |
| 
 | |
|         try:
 | |
|             contents = self.ctd.get_test_data(requested_filename)
 | |
|             self.write_to_fid(fid, contents)
 | |
|             return fid, filename
 | |
| 
 | |
|         except Exception:
 | |
|             log.exception("Failed to make test file")
 | |
|             raise SmbException(STATUS_NO_SUCH_FILE, "Failed to make test file")
 | |
| 
 | |
| 
 | |
| class SmbException(Exception):
 | |
|     def __init__(self, error_code, error_message):
 | |
|         super(SmbException, self).__init__(error_message)
 | |
|         self.error_code = error_code
 | |
| 
 | |
| 
 | |
| class ScriptRC(object):
 | |
|     """Enum for script return codes"""
 | |
|     SUCCESS = 0
 | |
|     FAILURE = 1
 | |
|     EXCEPTION = 2
 | |
| 
 | |
| 
 | |
| class ScriptException(Exception):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| def get_options():
 | |
|     parser = argparse.ArgumentParser()
 | |
| 
 | |
|     parser.add_argument("--port", action="store", default=9017,
 | |
|                       type=int, help="port to listen on")
 | |
|     parser.add_argument("--verbose", action="store", type=int, default=0,
 | |
|                         help="verbose output")
 | |
|     parser.add_argument("--pidfile", action="store",
 | |
|                         help="file name for the PID")
 | |
|     parser.add_argument("--logfile", action="store",
 | |
|                         help="file name for the log")
 | |
|     parser.add_argument("--srcdir", action="store", help="test directory")
 | |
|     parser.add_argument("--id", action="store", help="server ID")
 | |
|     parser.add_argument("--ipv4", action="store_true", default=0,
 | |
|                         help="IPv4 flag")
 | |
| 
 | |
|     return parser.parse_args()
 | |
| 
 | |
| 
 | |
| def setup_logging(options):
 | |
|     """
 | |
|     Set up logging from the command line options
 | |
|     """
 | |
|     root_logger = logging.getLogger()
 | |
|     add_stdout = False
 | |
| 
 | |
|     formatter = logging.Formatter("%(asctime)s %(levelname)-5.5s %(message)s")
 | |
| 
 | |
|     # Write out to a logfile
 | |
|     if options.logfile:
 | |
|         handler = logging.FileHandler(options.logfile, mode="w")
 | |
|         handler.setFormatter(formatter)
 | |
|         handler.setLevel(logging.DEBUG)
 | |
|         root_logger.addHandler(handler)
 | |
|     else:
 | |
|         # The logfile wasn't specified. Add a stdout logger.
 | |
|         add_stdout = True
 | |
| 
 | |
|     if options.verbose:
 | |
|         # Add a stdout logger as well in verbose mode
 | |
|         root_logger.setLevel(logging.DEBUG)
 | |
|         add_stdout = True
 | |
|     else:
 | |
|         root_logger.setLevel(logging.INFO)
 | |
| 
 | |
|     if add_stdout:
 | |
|         stdout_handler = logging.StreamHandler(sys.stdout)
 | |
|         stdout_handler.setFormatter(formatter)
 | |
|         stdout_handler.setLevel(logging.DEBUG)
 | |
|         root_logger.addHandler(stdout_handler)
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     # Get the options from the user.
 | |
|     options = get_options()
 | |
| 
 | |
|     # Setup logging using the user options
 | |
|     setup_logging(options)
 | |
| 
 | |
|     # Run main script.
 | |
|     try:
 | |
|         rc = smbserver(options)
 | |
|     except Exception as e:
 | |
|         log.exception(e)
 | |
|         rc = ScriptRC.EXCEPTION
 | |
| 
 | |
|     log.info("[SMB] Returning %d", rc)
 | |
|     sys.exit(rc)
 |