Basic Python HTTP Server The 2019 Stack Overflow Developer Survey Results Are In Announcing the arrival of Valued Associate #679: Cesar Manara Planned maintenance scheduled April 17/18, 2019 at 00:00UTC (8:00pm US/Eastern)Node.JS HTTP server displaying GoogleConcise HTTP serverPython HTTP ServerNodeJS static file HTTP server“Two way” HTTP ClientBasic multithread server codeBasic web server in Python 3Very basic C++ HTTP ParserNode.js HTTP server with request loggingA simple HTTP server implementation in Java
Why are PDP-7-style microprogrammed instructions out of vogue?
Word to describe a time interval
Huge performance difference of the command find with and without using %M option to show permissions
How did the crowd guess the pentatonic scale in Bobby McFerrin's presentation?
Working through the single responsibility principle (SRP) in Python when calls are expensive
Make it rain characters
How to make Illustrator type tool selection automatically adapt with text length
Do I have Disadvantage attacking with an off-hand weapon?
For what reasons would an animal species NOT cross a *horizontal* land bridge?
Can a flute soloist sit?
How do I design a circuit to convert a 100 mV and 50 Hz sine wave to a square wave?
What force causes entropy to increase?
Windows 10: How to Lock (not sleep) laptop on lid close?
Is there a way to generate uniformly distributed points on a sphere from a fixed amount of random real numbers per point?
"is" operation returns false even though two objects have same id
60's-70's movie: home appliances revolting against the owners
Why can't wing-mounted spoilers be used to steepen approaches?
how can a perfect fourth interval be considered either consonant or dissonant?
Circular reasoning in L'Hopital's rule
How to handle characters who are more educated than the author?
Is every episode of "Where are my Pants?" identical?
How do you keep chess fun when your opponent constantly beats you?
Mortgage adviser recommends a longer term than necessary combined with overpayments
Why did Peik Lin say, "I'm not an animal"?
Basic Python HTTP Server
The 2019 Stack Overflow Developer Survey Results Are In
Announcing the arrival of Valued Associate #679: Cesar Manara
Planned maintenance scheduled April 17/18, 2019 at 00:00UTC (8:00pm US/Eastern)Node.JS HTTP server displaying GoogleConcise HTTP serverPython HTTP ServerNodeJS static file HTTP server“Two way” HTTP ClientBasic multithread server codeBasic web server in Python 3Very basic C++ HTTP ParserNode.js HTTP server with request loggingA simple HTTP server implementation in Java
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty margin-bottom:0;
$begingroup$
It's an extremely simply HTTP server in Python using the socket library, and a few others to get the MIME type, etc...
I've also avoided the ../../
vulnerability, although some of the code in the send_file
function seems a bit weak.
It should also be PEP8 compliant, aside from maybe some trailing whitespace in comments.
import filetype
import socket
import _thread
class ServerSocket():
''' Recieves connections, parses the request and ships it off to a handler
'''
def __init__(self, address, handler=None, *args, **kwargs):
''' Creates a server socket and defines a handler for the server
'''
self.socket = socket.socket()
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(address)
self.handler_args = args
self.handler_kwargs = kwargs
if handler:
self.handler = handler # The custom handler
else:
self.handler = Handler # The default handler
def initialise(self, open_connections=5):
''' Initilises the server socket and has it listen for connections
'''
self.socket.listen(open_connections)
self.listen()
def parse(self, data):
''' Splits a packet into
the request,
the headers (which includes the request),
and contents
'''
stringed = str(data, 'utf-8')
split = stringed.split('rnrn')
headers = split[0]
if len(split) > 1:
content = split[1]
else:
content = []
request = headers.split(' ')[0]
return request, headers, content
def handle(self, client, address):
''' Parses the data and handles the request. It then closes the connection
'''
try:
data = client.recv(1024)
except ConnectionResetError:
if self.handler_kwargs["logging"] is True:
print(f'address[0] unexpectedly quit')
client.close()
return
parsed = self.parse(data)
handler = self.handler(self.handler_args, self.handler_kwargs)
handler.handle(client, parsed, address)
client.close()
def listen(self):
''' Listens until a keyboard interrupt and handles each connection in a
new thread
'''
try:
while True:
client_data = self.socket.accept()
if self.handler_kwargs['logging'] is True:
print(f'Connection from client_data[1][0]')
_thread.start_new_thread(self.handle, client_data)
except KeyboardInterrupt:
self.socket.close()
class Handler():
''' Handles requests from the Server Socket
'''
def __init__(self, args, kwargs):
self.args = args
self.kwargs = kwargs
def set_status(self, code, message):
''' Used to add a status line:
- 'HTTP/1.1 200 OK' or 'HTTP/1.1 404 Not Found', etc...
'''
self.reply_headers = [f'HTTP/1.0 code message']
def set_header(self, header, content):
''' Defines a custom header and adds it to the response
'''
self.reply_headers += [f'header: content']
def response(self, content):
''' Adds to the content of the response
'''
if type(content) == str:
self.reply_content += content.split('n')
else:
self.reply_content += [content]
def calculate_content_length(self):
''' Calculates the content length and adds it to the header
'''
length = len(self.reply_content) * 2
lengths = [len(line) for line in self.reply_content]
length += sum(lengths)
self.set_header('Content-Length', length)
def get_type(self, file_name):
return filetype.guess('./public/'+file_name)
def extract_file_name(self, file_name=None):
if file_name:
f_name = file_name[1:]
else:
f_name = self.request_status.split(' ')[1][1:]
return f_name
def send_file(self, file_name=None):
if file_name is None:
file_name = self.extract_file_name()
if file_name == '':
file_name = 'index.html'
elif file_name[0] in './':
self.set_status(403, "Forbidden")
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Error 403: Forbidden</p>']
return
try:
with open('./public/'+file_name, 'rb') as file:
file_contents = file.read()
except FileNotFoundError:
self.set_status(404, 'Not Found')
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Error 404: File not found</p>']
return
file_type = self.get_type(file_name)
if file_type is not None:
self.set_header('Content-Type', file_type.MIME)
elif file_name.split('.')[-1] == 'html':
self.set_header('Content-Type', 'text/html')
else:
self.set_header('Content-Type', 'text/txt')
self.response(file_contents)
def get_request_address(self):
return self.address
def parse_headers(self, headers):
t =
for header in headers[1:]:
t[header.split(': ')[0]] = header.split(': ')[1]
return t
def reply(self):
''' Assembles the response and sends it to the client
'''
if self.reply_headers[0][0:4] != "HTTP":
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Response Status unspecified</p>']
self.calculate_content_length()
message = 'rn'.join(self.reply_headers)
message += 'rnrn'
try:
message += 'rn'.join(self.reply_content)
message += 'rn'
except TypeError:
message = bytes(message, 'utf-8')
message += b'rn'.join(self.reply_content)
message += b'rn'
try:
if type(message) == str:
self.client.send(bytes(message, 'utf-8'))
else:
self.client.send(message)
except:
pass
def handle(self, client, parsed_data, address):
''' Initialises variables and case-switches the request type to
determine the handler function
'''
self.client = client
self.address = address
self.reply_headers = []
self.reply_content = []
self.headers = True
self.request_status = parsed_data[1].split('rn')[0]
request = parsed_data[0]
headers = self.parse_headers(parsed_data[1].split('rn'))
contents = parsed_data[2]
if request == "GET":
func = self.get
elif request == "POST":
func = self.post
elif request == "HEAD":
func = self.head
elif request == "PUT":
func = self.put
elif request == "DELETE":
func = self.delete
elif request == "CONNECT":
func = self.connect
elif request == "OPTIONS":
func = self.options
elif request == "TRACE":
func = self.trace
elif request == "PATCH":
func = self.patch
else:
func = self.default
func(headers, contents)
self.reply()
def default(self, headers, contents):
''' If the request is not known, defaults to this
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Unknown Request Type</p>''')
def get(self, headers, contents):
''' Overwrite to customly handle GET requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a GET Request</p>''')
def post(self, headers, contents):
''' Overwrite to customly handle POST requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a POST Request</p>''')
def head(self, headers, contents):
''' Overwrite to customly handle HEAD requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a HEAD Request</p>''')
def put(self, headers, contents):
''' Overwrite to customly handle PUT requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a PUT Request</p>''')
def delete(self, headers, contents):
''' Overwrite to customly handle DELETE requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a DELETE Request</p>''')
def connect(self, headers, contents):
''' Overwrite to customly handle CONNECT requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a CONNECT Request</p>''')
def options(self, headers, contents):
''' Overwrite to customly handle OPTIONS requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got an OPTIONS Request</p>''')
def trace(self, headers, contents):
''' Overwrite to customly handle TRACE requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a TRACE Request</p>''')
def patch(self, headers, contents):
''' Overwrite to customly handle PATCH requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a PATCH Request</p>''')
# =======================================================================
if __name__ == "__main__":
import sys
class CustomHandler(Handler):
def get(self, headers, contents):
self.set_status(200, 'OK')
request_address = self.get_request_address()[0]
file_name = self.extract_file_name()
print(f'request_address -> headers["Host"]/file_name')
self.send_file()
def run():
if len(sys.argv) == 2:
port = int(sys.argv[1])
else:
port = 80
try:
print('Initialising...', end='')
http_server = ServerSocket(
('0.0.0.0', port),
CustomHandler,
logging=True
)
print('Done')
http_server.initialise()
except Exception as e:
print(f'e')
run()
python python-3.x http server
$endgroup$
add a comment |
$begingroup$
It's an extremely simply HTTP server in Python using the socket library, and a few others to get the MIME type, etc...
I've also avoided the ../../
vulnerability, although some of the code in the send_file
function seems a bit weak.
It should also be PEP8 compliant, aside from maybe some trailing whitespace in comments.
import filetype
import socket
import _thread
class ServerSocket():
''' Recieves connections, parses the request and ships it off to a handler
'''
def __init__(self, address, handler=None, *args, **kwargs):
''' Creates a server socket and defines a handler for the server
'''
self.socket = socket.socket()
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(address)
self.handler_args = args
self.handler_kwargs = kwargs
if handler:
self.handler = handler # The custom handler
else:
self.handler = Handler # The default handler
def initialise(self, open_connections=5):
''' Initilises the server socket and has it listen for connections
'''
self.socket.listen(open_connections)
self.listen()
def parse(self, data):
''' Splits a packet into
the request,
the headers (which includes the request),
and contents
'''
stringed = str(data, 'utf-8')
split = stringed.split('rnrn')
headers = split[0]
if len(split) > 1:
content = split[1]
else:
content = []
request = headers.split(' ')[0]
return request, headers, content
def handle(self, client, address):
''' Parses the data and handles the request. It then closes the connection
'''
try:
data = client.recv(1024)
except ConnectionResetError:
if self.handler_kwargs["logging"] is True:
print(f'address[0] unexpectedly quit')
client.close()
return
parsed = self.parse(data)
handler = self.handler(self.handler_args, self.handler_kwargs)
handler.handle(client, parsed, address)
client.close()
def listen(self):
''' Listens until a keyboard interrupt and handles each connection in a
new thread
'''
try:
while True:
client_data = self.socket.accept()
if self.handler_kwargs['logging'] is True:
print(f'Connection from client_data[1][0]')
_thread.start_new_thread(self.handle, client_data)
except KeyboardInterrupt:
self.socket.close()
class Handler():
''' Handles requests from the Server Socket
'''
def __init__(self, args, kwargs):
self.args = args
self.kwargs = kwargs
def set_status(self, code, message):
''' Used to add a status line:
- 'HTTP/1.1 200 OK' or 'HTTP/1.1 404 Not Found', etc...
'''
self.reply_headers = [f'HTTP/1.0 code message']
def set_header(self, header, content):
''' Defines a custom header and adds it to the response
'''
self.reply_headers += [f'header: content']
def response(self, content):
''' Adds to the content of the response
'''
if type(content) == str:
self.reply_content += content.split('n')
else:
self.reply_content += [content]
def calculate_content_length(self):
''' Calculates the content length and adds it to the header
'''
length = len(self.reply_content) * 2
lengths = [len(line) for line in self.reply_content]
length += sum(lengths)
self.set_header('Content-Length', length)
def get_type(self, file_name):
return filetype.guess('./public/'+file_name)
def extract_file_name(self, file_name=None):
if file_name:
f_name = file_name[1:]
else:
f_name = self.request_status.split(' ')[1][1:]
return f_name
def send_file(self, file_name=None):
if file_name is None:
file_name = self.extract_file_name()
if file_name == '':
file_name = 'index.html'
elif file_name[0] in './':
self.set_status(403, "Forbidden")
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Error 403: Forbidden</p>']
return
try:
with open('./public/'+file_name, 'rb') as file:
file_contents = file.read()
except FileNotFoundError:
self.set_status(404, 'Not Found')
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Error 404: File not found</p>']
return
file_type = self.get_type(file_name)
if file_type is not None:
self.set_header('Content-Type', file_type.MIME)
elif file_name.split('.')[-1] == 'html':
self.set_header('Content-Type', 'text/html')
else:
self.set_header('Content-Type', 'text/txt')
self.response(file_contents)
def get_request_address(self):
return self.address
def parse_headers(self, headers):
t =
for header in headers[1:]:
t[header.split(': ')[0]] = header.split(': ')[1]
return t
def reply(self):
''' Assembles the response and sends it to the client
'''
if self.reply_headers[0][0:4] != "HTTP":
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Response Status unspecified</p>']
self.calculate_content_length()
message = 'rn'.join(self.reply_headers)
message += 'rnrn'
try:
message += 'rn'.join(self.reply_content)
message += 'rn'
except TypeError:
message = bytes(message, 'utf-8')
message += b'rn'.join(self.reply_content)
message += b'rn'
try:
if type(message) == str:
self.client.send(bytes(message, 'utf-8'))
else:
self.client.send(message)
except:
pass
def handle(self, client, parsed_data, address):
''' Initialises variables and case-switches the request type to
determine the handler function
'''
self.client = client
self.address = address
self.reply_headers = []
self.reply_content = []
self.headers = True
self.request_status = parsed_data[1].split('rn')[0]
request = parsed_data[0]
headers = self.parse_headers(parsed_data[1].split('rn'))
contents = parsed_data[2]
if request == "GET":
func = self.get
elif request == "POST":
func = self.post
elif request == "HEAD":
func = self.head
elif request == "PUT":
func = self.put
elif request == "DELETE":
func = self.delete
elif request == "CONNECT":
func = self.connect
elif request == "OPTIONS":
func = self.options
elif request == "TRACE":
func = self.trace
elif request == "PATCH":
func = self.patch
else:
func = self.default
func(headers, contents)
self.reply()
def default(self, headers, contents):
''' If the request is not known, defaults to this
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Unknown Request Type</p>''')
def get(self, headers, contents):
''' Overwrite to customly handle GET requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a GET Request</p>''')
def post(self, headers, contents):
''' Overwrite to customly handle POST requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a POST Request</p>''')
def head(self, headers, contents):
''' Overwrite to customly handle HEAD requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a HEAD Request</p>''')
def put(self, headers, contents):
''' Overwrite to customly handle PUT requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a PUT Request</p>''')
def delete(self, headers, contents):
''' Overwrite to customly handle DELETE requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a DELETE Request</p>''')
def connect(self, headers, contents):
''' Overwrite to customly handle CONNECT requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a CONNECT Request</p>''')
def options(self, headers, contents):
''' Overwrite to customly handle OPTIONS requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got an OPTIONS Request</p>''')
def trace(self, headers, contents):
''' Overwrite to customly handle TRACE requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a TRACE Request</p>''')
def patch(self, headers, contents):
''' Overwrite to customly handle PATCH requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a PATCH Request</p>''')
# =======================================================================
if __name__ == "__main__":
import sys
class CustomHandler(Handler):
def get(self, headers, contents):
self.set_status(200, 'OK')
request_address = self.get_request_address()[0]
file_name = self.extract_file_name()
print(f'request_address -> headers["Host"]/file_name')
self.send_file()
def run():
if len(sys.argv) == 2:
port = int(sys.argv[1])
else:
port = 80
try:
print('Initialising...', end='')
http_server = ServerSocket(
('0.0.0.0', port),
CustomHandler,
logging=True
)
print('Done')
http_server.initialise()
except Exception as e:
print(f'e')
run()
python python-3.x http server
$endgroup$
add a comment |
$begingroup$
It's an extremely simply HTTP server in Python using the socket library, and a few others to get the MIME type, etc...
I've also avoided the ../../
vulnerability, although some of the code in the send_file
function seems a bit weak.
It should also be PEP8 compliant, aside from maybe some trailing whitespace in comments.
import filetype
import socket
import _thread
class ServerSocket():
''' Recieves connections, parses the request and ships it off to a handler
'''
def __init__(self, address, handler=None, *args, **kwargs):
''' Creates a server socket and defines a handler for the server
'''
self.socket = socket.socket()
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(address)
self.handler_args = args
self.handler_kwargs = kwargs
if handler:
self.handler = handler # The custom handler
else:
self.handler = Handler # The default handler
def initialise(self, open_connections=5):
''' Initilises the server socket and has it listen for connections
'''
self.socket.listen(open_connections)
self.listen()
def parse(self, data):
''' Splits a packet into
the request,
the headers (which includes the request),
and contents
'''
stringed = str(data, 'utf-8')
split = stringed.split('rnrn')
headers = split[0]
if len(split) > 1:
content = split[1]
else:
content = []
request = headers.split(' ')[0]
return request, headers, content
def handle(self, client, address):
''' Parses the data and handles the request. It then closes the connection
'''
try:
data = client.recv(1024)
except ConnectionResetError:
if self.handler_kwargs["logging"] is True:
print(f'address[0] unexpectedly quit')
client.close()
return
parsed = self.parse(data)
handler = self.handler(self.handler_args, self.handler_kwargs)
handler.handle(client, parsed, address)
client.close()
def listen(self):
''' Listens until a keyboard interrupt and handles each connection in a
new thread
'''
try:
while True:
client_data = self.socket.accept()
if self.handler_kwargs['logging'] is True:
print(f'Connection from client_data[1][0]')
_thread.start_new_thread(self.handle, client_data)
except KeyboardInterrupt:
self.socket.close()
class Handler():
''' Handles requests from the Server Socket
'''
def __init__(self, args, kwargs):
self.args = args
self.kwargs = kwargs
def set_status(self, code, message):
''' Used to add a status line:
- 'HTTP/1.1 200 OK' or 'HTTP/1.1 404 Not Found', etc...
'''
self.reply_headers = [f'HTTP/1.0 code message']
def set_header(self, header, content):
''' Defines a custom header and adds it to the response
'''
self.reply_headers += [f'header: content']
def response(self, content):
''' Adds to the content of the response
'''
if type(content) == str:
self.reply_content += content.split('n')
else:
self.reply_content += [content]
def calculate_content_length(self):
''' Calculates the content length and adds it to the header
'''
length = len(self.reply_content) * 2
lengths = [len(line) for line in self.reply_content]
length += sum(lengths)
self.set_header('Content-Length', length)
def get_type(self, file_name):
return filetype.guess('./public/'+file_name)
def extract_file_name(self, file_name=None):
if file_name:
f_name = file_name[1:]
else:
f_name = self.request_status.split(' ')[1][1:]
return f_name
def send_file(self, file_name=None):
if file_name is None:
file_name = self.extract_file_name()
if file_name == '':
file_name = 'index.html'
elif file_name[0] in './':
self.set_status(403, "Forbidden")
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Error 403: Forbidden</p>']
return
try:
with open('./public/'+file_name, 'rb') as file:
file_contents = file.read()
except FileNotFoundError:
self.set_status(404, 'Not Found')
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Error 404: File not found</p>']
return
file_type = self.get_type(file_name)
if file_type is not None:
self.set_header('Content-Type', file_type.MIME)
elif file_name.split('.')[-1] == 'html':
self.set_header('Content-Type', 'text/html')
else:
self.set_header('Content-Type', 'text/txt')
self.response(file_contents)
def get_request_address(self):
return self.address
def parse_headers(self, headers):
t =
for header in headers[1:]:
t[header.split(': ')[0]] = header.split(': ')[1]
return t
def reply(self):
''' Assembles the response and sends it to the client
'''
if self.reply_headers[0][0:4] != "HTTP":
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Response Status unspecified</p>']
self.calculate_content_length()
message = 'rn'.join(self.reply_headers)
message += 'rnrn'
try:
message += 'rn'.join(self.reply_content)
message += 'rn'
except TypeError:
message = bytes(message, 'utf-8')
message += b'rn'.join(self.reply_content)
message += b'rn'
try:
if type(message) == str:
self.client.send(bytes(message, 'utf-8'))
else:
self.client.send(message)
except:
pass
def handle(self, client, parsed_data, address):
''' Initialises variables and case-switches the request type to
determine the handler function
'''
self.client = client
self.address = address
self.reply_headers = []
self.reply_content = []
self.headers = True
self.request_status = parsed_data[1].split('rn')[0]
request = parsed_data[0]
headers = self.parse_headers(parsed_data[1].split('rn'))
contents = parsed_data[2]
if request == "GET":
func = self.get
elif request == "POST":
func = self.post
elif request == "HEAD":
func = self.head
elif request == "PUT":
func = self.put
elif request == "DELETE":
func = self.delete
elif request == "CONNECT":
func = self.connect
elif request == "OPTIONS":
func = self.options
elif request == "TRACE":
func = self.trace
elif request == "PATCH":
func = self.patch
else:
func = self.default
func(headers, contents)
self.reply()
def default(self, headers, contents):
''' If the request is not known, defaults to this
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Unknown Request Type</p>''')
def get(self, headers, contents):
''' Overwrite to customly handle GET requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a GET Request</p>''')
def post(self, headers, contents):
''' Overwrite to customly handle POST requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a POST Request</p>''')
def head(self, headers, contents):
''' Overwrite to customly handle HEAD requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a HEAD Request</p>''')
def put(self, headers, contents):
''' Overwrite to customly handle PUT requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a PUT Request</p>''')
def delete(self, headers, contents):
''' Overwrite to customly handle DELETE requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a DELETE Request</p>''')
def connect(self, headers, contents):
''' Overwrite to customly handle CONNECT requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a CONNECT Request</p>''')
def options(self, headers, contents):
''' Overwrite to customly handle OPTIONS requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got an OPTIONS Request</p>''')
def trace(self, headers, contents):
''' Overwrite to customly handle TRACE requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a TRACE Request</p>''')
def patch(self, headers, contents):
''' Overwrite to customly handle PATCH requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a PATCH Request</p>''')
# =======================================================================
if __name__ == "__main__":
import sys
class CustomHandler(Handler):
def get(self, headers, contents):
self.set_status(200, 'OK')
request_address = self.get_request_address()[0]
file_name = self.extract_file_name()
print(f'request_address -> headers["Host"]/file_name')
self.send_file()
def run():
if len(sys.argv) == 2:
port = int(sys.argv[1])
else:
port = 80
try:
print('Initialising...', end='')
http_server = ServerSocket(
('0.0.0.0', port),
CustomHandler,
logging=True
)
print('Done')
http_server.initialise()
except Exception as e:
print(f'e')
run()
python python-3.x http server
$endgroup$
It's an extremely simply HTTP server in Python using the socket library, and a few others to get the MIME type, etc...
I've also avoided the ../../
vulnerability, although some of the code in the send_file
function seems a bit weak.
It should also be PEP8 compliant, aside from maybe some trailing whitespace in comments.
import filetype
import socket
import _thread
class ServerSocket():
''' Recieves connections, parses the request and ships it off to a handler
'''
def __init__(self, address, handler=None, *args, **kwargs):
''' Creates a server socket and defines a handler for the server
'''
self.socket = socket.socket()
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(address)
self.handler_args = args
self.handler_kwargs = kwargs
if handler:
self.handler = handler # The custom handler
else:
self.handler = Handler # The default handler
def initialise(self, open_connections=5):
''' Initilises the server socket and has it listen for connections
'''
self.socket.listen(open_connections)
self.listen()
def parse(self, data):
''' Splits a packet into
the request,
the headers (which includes the request),
and contents
'''
stringed = str(data, 'utf-8')
split = stringed.split('rnrn')
headers = split[0]
if len(split) > 1:
content = split[1]
else:
content = []
request = headers.split(' ')[0]
return request, headers, content
def handle(self, client, address):
''' Parses the data and handles the request. It then closes the connection
'''
try:
data = client.recv(1024)
except ConnectionResetError:
if self.handler_kwargs["logging"] is True:
print(f'address[0] unexpectedly quit')
client.close()
return
parsed = self.parse(data)
handler = self.handler(self.handler_args, self.handler_kwargs)
handler.handle(client, parsed, address)
client.close()
def listen(self):
''' Listens until a keyboard interrupt and handles each connection in a
new thread
'''
try:
while True:
client_data = self.socket.accept()
if self.handler_kwargs['logging'] is True:
print(f'Connection from client_data[1][0]')
_thread.start_new_thread(self.handle, client_data)
except KeyboardInterrupt:
self.socket.close()
class Handler():
''' Handles requests from the Server Socket
'''
def __init__(self, args, kwargs):
self.args = args
self.kwargs = kwargs
def set_status(self, code, message):
''' Used to add a status line:
- 'HTTP/1.1 200 OK' or 'HTTP/1.1 404 Not Found', etc...
'''
self.reply_headers = [f'HTTP/1.0 code message']
def set_header(self, header, content):
''' Defines a custom header and adds it to the response
'''
self.reply_headers += [f'header: content']
def response(self, content):
''' Adds to the content of the response
'''
if type(content) == str:
self.reply_content += content.split('n')
else:
self.reply_content += [content]
def calculate_content_length(self):
''' Calculates the content length and adds it to the header
'''
length = len(self.reply_content) * 2
lengths = [len(line) for line in self.reply_content]
length += sum(lengths)
self.set_header('Content-Length', length)
def get_type(self, file_name):
return filetype.guess('./public/'+file_name)
def extract_file_name(self, file_name=None):
if file_name:
f_name = file_name[1:]
else:
f_name = self.request_status.split(' ')[1][1:]
return f_name
def send_file(self, file_name=None):
if file_name is None:
file_name = self.extract_file_name()
if file_name == '':
file_name = 'index.html'
elif file_name[0] in './':
self.set_status(403, "Forbidden")
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Error 403: Forbidden</p>']
return
try:
with open('./public/'+file_name, 'rb') as file:
file_contents = file.read()
except FileNotFoundError:
self.set_status(404, 'Not Found')
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Error 404: File not found</p>']
return
file_type = self.get_type(file_name)
if file_type is not None:
self.set_header('Content-Type', file_type.MIME)
elif file_name.split('.')[-1] == 'html':
self.set_header('Content-Type', 'text/html')
else:
self.set_header('Content-Type', 'text/txt')
self.response(file_contents)
def get_request_address(self):
return self.address
def parse_headers(self, headers):
t =
for header in headers[1:]:
t[header.split(': ')[0]] = header.split(': ')[1]
return t
def reply(self):
''' Assembles the response and sends it to the client
'''
if self.reply_headers[0][0:4] != "HTTP":
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.reply_content = ['<p>Response Status unspecified</p>']
self.calculate_content_length()
message = 'rn'.join(self.reply_headers)
message += 'rnrn'
try:
message += 'rn'.join(self.reply_content)
message += 'rn'
except TypeError:
message = bytes(message, 'utf-8')
message += b'rn'.join(self.reply_content)
message += b'rn'
try:
if type(message) == str:
self.client.send(bytes(message, 'utf-8'))
else:
self.client.send(message)
except:
pass
def handle(self, client, parsed_data, address):
''' Initialises variables and case-switches the request type to
determine the handler function
'''
self.client = client
self.address = address
self.reply_headers = []
self.reply_content = []
self.headers = True
self.request_status = parsed_data[1].split('rn')[0]
request = parsed_data[0]
headers = self.parse_headers(parsed_data[1].split('rn'))
contents = parsed_data[2]
if request == "GET":
func = self.get
elif request == "POST":
func = self.post
elif request == "HEAD":
func = self.head
elif request == "PUT":
func = self.put
elif request == "DELETE":
func = self.delete
elif request == "CONNECT":
func = self.connect
elif request == "OPTIONS":
func = self.options
elif request == "TRACE":
func = self.trace
elif request == "PATCH":
func = self.patch
else:
func = self.default
func(headers, contents)
self.reply()
def default(self, headers, contents):
''' If the request is not known, defaults to this
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Unknown Request Type</p>''')
def get(self, headers, contents):
''' Overwrite to customly handle GET requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a GET Request</p>''')
def post(self, headers, contents):
''' Overwrite to customly handle POST requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a POST Request</p>''')
def head(self, headers, contents):
''' Overwrite to customly handle HEAD requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a HEAD Request</p>''')
def put(self, headers, contents):
''' Overwrite to customly handle PUT requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a PUT Request</p>''')
def delete(self, headers, contents):
''' Overwrite to customly handle DELETE requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a DELETE Request</p>''')
def connect(self, headers, contents):
''' Overwrite to customly handle CONNECT requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a CONNECT Request</p>''')
def options(self, headers, contents):
''' Overwrite to customly handle OPTIONS requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got an OPTIONS Request</p>''')
def trace(self, headers, contents):
''' Overwrite to customly handle TRACE requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a TRACE Request</p>''')
def patch(self, headers, contents):
''' Overwrite to customly handle PATCH requests
'''
self.set_status(200, 'OK')
self.set_header('Content-Type', 'text/html')
self.response('''<p>Successfully got a PATCH Request</p>''')
# =======================================================================
if __name__ == "__main__":
import sys
class CustomHandler(Handler):
def get(self, headers, contents):
self.set_status(200, 'OK')
request_address = self.get_request_address()[0]
file_name = self.extract_file_name()
print(f'request_address -> headers["Host"]/file_name')
self.send_file()
def run():
if len(sys.argv) == 2:
port = int(sys.argv[1])
else:
port = 80
try:
print('Initialising...', end='')
http_server = ServerSocket(
('0.0.0.0', port),
CustomHandler,
logging=True
)
print('Done')
http_server.initialise()
except Exception as e:
print(f'e')
run()
python python-3.x http server
python python-3.x http server
edited 6 mins ago
200_success
131k17157422
131k17157422
asked 11 mins ago
LForchiniLForchini
535
535
add a comment |
add a comment |
0
active
oldest
votes
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "196"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: false,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: null,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
var $window = $(window),
onScroll = function(e)
var $elem = $('.new-login-left'),
docViewTop = $window.scrollTop(),
docViewBottom = docViewTop + $window.height(),
elemTop = $elem.offset().top,
elemBottom = elemTop + $elem.height();
if ((docViewTop elemBottom))
StackExchange.using('gps', function() StackExchange.gps.track('embedded_signup_form.view', location: 'question_page' ); );
$window.unbind('scroll', onScroll);
;
$window.on('scroll', onScroll);
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f217358%2fbasic-python-http-server%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
0
active
oldest
votes
0
active
oldest
votes
active
oldest
votes
active
oldest
votes
Thanks for contributing an answer to Code Review Stack Exchange!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
Use MathJax to format equations. MathJax reference.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
var $window = $(window),
onScroll = function(e)
var $elem = $('.new-login-left'),
docViewTop = $window.scrollTop(),
docViewBottom = docViewTop + $window.height(),
elemTop = $elem.offset().top,
elemBottom = elemTop + $elem.height();
if ((docViewTop elemBottom))
StackExchange.using('gps', function() StackExchange.gps.track('embedded_signup_form.view', location: 'question_page' ); );
$window.unbind('scroll', onScroll);
;
$window.on('scroll', onScroll);
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f217358%2fbasic-python-http-server%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
var $window = $(window),
onScroll = function(e)
var $elem = $('.new-login-left'),
docViewTop = $window.scrollTop(),
docViewBottom = docViewTop + $window.height(),
elemTop = $elem.offset().top,
elemBottom = elemTop + $elem.height();
if ((docViewTop elemBottom))
StackExchange.using('gps', function() StackExchange.gps.track('embedded_signup_form.view', location: 'question_page' ); );
$window.unbind('scroll', onScroll);
;
$window.on('scroll', onScroll);
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
var $window = $(window),
onScroll = function(e)
var $elem = $('.new-login-left'),
docViewTop = $window.scrollTop(),
docViewBottom = docViewTop + $window.height(),
elemTop = $elem.offset().top,
elemBottom = elemTop + $elem.height();
if ((docViewTop elemBottom))
StackExchange.using('gps', function() StackExchange.gps.track('embedded_signup_form.view', location: 'question_page' ); );
$window.unbind('scroll', onScroll);
;
$window.on('scroll', onScroll);
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
var $window = $(window),
onScroll = function(e)
var $elem = $('.new-login-left'),
docViewTop = $window.scrollTop(),
docViewBottom = docViewTop + $window.height(),
elemTop = $elem.offset().top,
elemBottom = elemTop + $elem.height();
if ((docViewTop elemBottom))
StackExchange.using('gps', function() StackExchange.gps.track('embedded_signup_form.view', location: 'question_page' ); );
$window.unbind('scroll', onScroll);
;
$window.on('scroll', onScroll);
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown