#!/usr/bin/env python3 # This is a simple HTTP server based on the HTTPServer and # SimpleHTTPRequestHandler. It has been extended with PUT # and DELETE functionality to store or delete results. # # See: https://github.com/python/cpython/blob/main/Lib/http/server.py from functools import partial from http import HTTPStatus from http.server import HTTPServer, SimpleHTTPRequestHandler import os import signal import socket import sys class AuthenticationError(Exception): pass class PUTEnabledHTTPRequestHandler(SimpleHTTPRequestHandler): def __init__(self, *args, basic_auth=None, **kwargs): self.basic_auth = None if basic_auth: import base64 self.basic_auth = base64.b64encode( basic_auth.encode("ascii") ).decode("ascii") super().__init__(*args, **kwargs) def do_GET(self): try: self._handle_auth() super().do_GET() except AuthenticationError: self.send_error(HTTPStatus.UNAUTHORIZED, "Need Authentication") def do_HEAD(self): try: self._handle_auth() super().do_HEAD() except AuthenticationError: self.send_error(HTTPStatus.UNAUTHORIZED, "Need Authentication") def do_PUT(self): path = self.translate_path(self.path) os.makedirs(os.path.dirname(path), exist_ok=True) try: self._handle_auth() file_length = int(self.headers["Content-Length"]) with open(path, "wb") as output_file: output_file.write(self.rfile.read(file_length)) self.send_response(HTTPStatus.CREATED) self.send_header("Content-Length", "0") self.end_headers() except AuthenticationError: self.send_error(HTTPStatus.UNAUTHORIZED, "Need Authentication") except OSError: self.send_error( HTTPStatus.INTERNAL_SERVER_ERROR, "Cannot open file for writing" ) def do_DELETE(self): path = self.translate_path(self.path) try: self._handle_auth() os.remove(path) self.send_response(HTTPStatus.OK) self.send_header("Content-Length", "0") self.end_headers() except AuthenticationError: self.send_error(HTTPStatus.UNAUTHORIZED, "Need Authentication") except OSError: self.send_error( HTTPStatus.INTERNAL_SERVER_ERROR, "Cannot delete file" ) def _handle_auth(self): if not self.basic_auth: return authorization = self.headers.get("authorization") if authorization: authorization = authorization.split() if len(authorization) == 2: if ( authorization[0] == "Basic" and authorization[1] == self.basic_auth ): return raise AuthenticationError("Authentication required") def _get_best_family(*address): infos = socket.getaddrinfo( *address, type=socket.SOCK_STREAM, flags=socket.AI_PASSIVE, ) family, type, proto, canonname, sockaddr = next(iter(infos)) return family, sockaddr def run(HandlerClass, ServerClass, port, bind): HandlerClass.protocol_version = "HTTP/1.1" ServerClass.address_family, addr = _get_best_family(bind, port) with ServerClass(addr, HandlerClass) as httpd: host, port = httpd.socket.getsockname()[:2] url_host = f"[{host}]" if ":" in host else host print( f"Serving HTTP on {host} port {port} " f"(http://{url_host}:{port}/) ..." ) try: httpd.serve_forever() except KeyboardInterrupt: print("\nKeyboard interrupt received, exiting.") sys.exit(0) def on_terminate(signum, frame): sys.stdout.flush() sys.stderr.flush() sys.exit(128 + signum) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument( "--basic-auth", "-B", help="Basic auth tuple like user:pass" ) parser.add_argument( "--bind", "-b", metavar="ADDRESS", help="Specify alternate bind address " "[default: all interfaces]", ) parser.add_argument( "--directory", "-d", default=os.getcwd(), help="Specify alternative directory " "[default:current directory]", ) parser.add_argument( "port", action="store", default=8080, type=int, nargs="?", help="Specify alternate port [default: 8080]", ) args = parser.parse_args() handler_class = partial( PUTEnabledHTTPRequestHandler, basic_auth=args.basic_auth ) os.chdir(args.directory) signal.signal(signal.SIGINT, on_terminate) signal.signal(signal.SIGTERM, on_terminate) run( HandlerClass=handler_class, ServerClass=HTTPServer, port=args.port, bind=args.bind, )