改造python3中的http.server为简单的文件下载服务

改造

修改python3中的http.server.SimpleHTTPRequestHandler,实现简单的文件上传下载服务

 simple_http_file_server.py:

# !/usr/bin/env python3

import datetime
import email
import html
import http.server
import io
import mimetypes
import os
import posixpath
import re
import shutil
import sys
import urllib.error
import urllib.parse
import urllib.request
import socket
from http import HTTPStatus
import threading
import contextlib
 
__version__ = "0.1"
__all__ = ["MySimpleHTTPRequestHandler"]
 
class MySimpleHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
	server_version = "SimpleHTTP/" + __version__
	extensions_map = _encodings_map_default = {
		'.gz': 'application/gzip',
		'.Z': 'application/octet-stream',
		'.bz2': 'application/x-bzip2',
		'.xz': 'application/x-xz',
	}
 
	def __init__(self, *args, directory=None, **kwargs):
		if directory is None:
			directory = os.getcwd()
		self.directory = os.fspath(directory)
		super().__init__(*args, **kwargs)
 
	def do_GET(self):
		f = self.send_head()
		if f:
			try:
				self.copyfile(f, self.wfile)
			finally:
				f.close()
 
	def do_HEAD(self):
		f = self.send_head()
		if f:
			f.close()
 
	def send_head(self):
		path = self.translate_path(self.path)
		f = None
		if os.path.isdir(path):
			parts = urllib.parse.urlsplit(self.path)
			if not parts.path.endswith('/'):
				# redirect browser - doing basically what apache does
				self.send_response(HTTPStatus.MOVED_PERMANENTLY)
				new_parts = (parts[0], parts[1], parts[2] + '/',
							 parts[3], parts[4])
				new_url = urllib.parse.urlunsplit(new_parts)
				self.send_header("Location", new_url)
				self.end_headers()
				return None
			for index in "index.html", "index.htm":
				index = os.path.join(path, index)
				if os.path.exists(index):
					path = index
					break
			else:
				return self.list_directory(path)
 
		ctype = self.guess_type(path)
		if path.endswith("/"):
			self.send_error(HTTPStatus.NOT_FOUND, "File not found")
			return None
		try:
			f = open(path, 'rb')
		except OSError:
			self.send_error(HTTPStatus.NOT_FOUND, "File not found")
			return None
		try:
			fs = os.fstat(f.fileno())
			# Use browser cache if possible
			if ("If-Modified-Since" in self.headers
					and "If-None-Match" not in self.headers):
				# compare If-Modified-Since and time of last file modification
				try:
					ims = email.utils.parsedate_to_datetime(self.headers["If-Modified-Since"])
				except (TypeError, IndexError, OverflowError, ValueError):
					# ignore ill-formed values
					pass
				else:
					if ims.tzinfo is None:
						# obsolete format with no timezone, cf.
						# https://tools.ietf.org/html/rfc7231#section-7.1.1.1
						ims = ims.replace(tzinfo=datetime.timezone.utc)
					if ims.tzinfo is datetime.timezone.utc:
						# compare to UTC datetime of last modification
						last_modif = datetime.datetime.fromtimestamp(
							fs.st_mtime, datetime.timezone.utc)
						# remove microseconds, like in If-Modified-Since
						last_modif = last_modif.replace(microsecond=0)
 
						if last_modif <= ims:
							self.send_response(HTTPStatus.NOT_MODIFIED)
							self.end_headers()
							f.close()
							return None
 
			self.send_response(HTTPStatus.OK)
			self.send_header("Content-type", ctype)
			self.send_header("Content-Length", str(fs[6]))
			self.send_header("Last-Modified",
							 self.date_time_string(fs.st_mtime))
			self.end_headers()
			return f
		except:
			f.close()
			raise
 
	def list_directory(self, path):
		try:
			list_dir = os.listdir(path)
		except OSError:
			self.send_error(HTTPStatus.NOT_FOUND, "No permission to list_dir directory")
			return None
		list_dir.sort(key=lambda a: a.lower())
		r = []
		try:
			display_path = urllib.parse.unquote(self.path, errors='surrogatepass')
		except UnicodeDecodeError:
			display_path = urllib.parse.unquote(path)
		display_path = html.escape(display_path, quote=False)
		enc = sys.getfilesystemencoding()
 
		form = """
			<h1>文件上传</h1>\n
			<form ENCTYPE="multipart/form-data" method="post">\n
				<input name="file" type="file"/>\n
				<input type="submit" value="upload"/>\n
			</form>\n"""
		title = 'Directory listing for %s' % display_path
		r.append('<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" '
				 '"http://www.w3.org/TR/html4/strict.dtd">')
		r.append('<html>\n<head>')
		r.append('<meta http-equiv="Content-Type" '
				 'content="text/html; charset=%s">' % enc)
		r.append('<title>%s</title>\n</head>' % title)
		r.append('<body>%s\n<h1>%s</h1>' % (form, title))
		r.append('<hr>\n<ul>')
		for name in list_dir:
			fullname = os.path.join(path, name)
			displayname = linkname = name
			# Append / for directories or @ for symbolic links
			if os.path.isdir(fullname):
				displayname = name + "/"
				linkname = name + "/"
			if os.path.islink(fullname):
				displayname = name + "@"
				# Note: a link to a directory displays with @ and links with /
			r.append('<li><a href="%s">%s</a></li>' % (urllib.parse.quote(linkname, errors='surrogatepass'),
													   html.escape(displayname, quote=False)))
		r.append('</ul>\n<hr>\n</body>\n</html>\n')
		encoded = '\n'.join(r).encode(enc, 'surrogate escape')
		f = io.BytesIO()
		f.write(encoded)
		f.seek(0)
		self.send_response(HTTPStatus.OK)
		self.send_header("Content-type", "text/html; charset=%s" % enc)
		self.send_header("Content-Length", str(len(encoded)))
		self.end_headers()
		return f
 
	def translate_path(self, path):
		# abandon query parameters
		path = path.split('?', 1)[0]
		path = path.split('#', 1)[0]
		# Don't forget explicit trailing slash when normalizing. Issue17324
		trailing_slash = path.rstrip().endswith('/')
		try:
			path = urllib.parse.unquote(path, errors='surrogatepass')
		except UnicodeDecodeError:
			path = urllib.parse.unquote(path)
		path = posixpath.normpath(path)
		words = path.split('/')
		words = filter(None, words)
		path = self.directory
		for word in words:
			if os.path.dirname(word) or word in (os.curdir, os.pardir):
				# Ignore components that are not a simple file/directory name
				continue
			path = os.path.join(path, word)
		if trailing_slash:
			path += '/'
		return path
 
	def copyfile(self, source, outputfile):
		shutil.copyfileobj(source, outputfile)
 
	def guess_type(self, path):
		base, ext = posixpath.splitext(path)
		if ext in self.extensions_map:
			return self.extensions_map[ext]
		ext = ext.lower()
		if ext in self.extensions_map:
			return self.extensions_map[ext]
		guess, _ = mimetypes.guess_type(path)
		if guess:
			return guess
		return 'application/octet-stream'
 
	def do_POST(self):
		r, info = self.deal_post_data()
		self.log_message('%s, %s => %s' % (r, info, self.client_address))
		enc = sys.getfilesystemencoding()
		res = [
			'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01//EN" '
			'"http://www.w3.org/TR/html4/strict.dtd">',
			'<html>\n<head>',
			'<meta http-equiv="Content-Type" content="text/html; charset=%s">' % enc,
			'<title>%s</title>\n</head>' % "Upload Result Page",
			'<body><h1>%s</h1>\n' % "Upload Result"
		]
		if r:
			res.append('<p>SUCCESS: %s</p>\n' % info)
		else:
			res.append('<p>FAILURE: %s</p>' % info)
		res.append('<a href=\"%s\">back</a>' % self.headers['referer'])
		res.append('</body></html>')
		encoded = '\n'.join(res).encode(enc, 'surrogate escape')
		f = io.BytesIO()
		f.write(encoded)
		length = f.tell()
		f.seek(0)
		self.send_response(200)
		self.send_header("Content-type", "text/html")
		self.send_header("Content-Length", str(length))
		self.end_headers()
		if f:
			self.copyfile(f, self.wfile)
			f.close()
 
	def deal_post_data(self):
		content_type = self.headers['content-type']
		if not content_type:
			return False, "Content-Type header doesn't contain boundary"
		boundary = content_type.split("=")[1].encode()
		remain_bytes = int(self.headers['content-length'])
		line = self.rfile.readline()
		remain_bytes -= len(line)
		if boundary not in line:
			return False, "Content NOT begin with boundary"
		line = self.rfile.readline()
		remain_bytes -= len(line)
		fn = re.findall(r'Content-Disposition.*name="file"; filename="(.*)"', line.decode())
		if not fn:
			return False, "Can't find out file name..."
		path = self.translate_path(self.path)
		fn = os.path.join(path, fn[0])
		line = self.rfile.readline()
		remain_bytes -= len(line)
		line = self.rfile.readline()
		remain_bytes -= len(line)
		try:
			out = open(fn, 'wb')
		except IOError:
			return False, "Can't create file to write, do you have permission to write?"
 
		preline = self.rfile.readline()
		remain_bytes -= len(preline)
		while remain_bytes > 0:
			line = self.rfile.readline()
			remain_bytes -= len(line)
			if boundary in line:
				preline = preline[0:-1]
				if preline.endswith(b'\r'):
					preline = preline[0:-1]
				out.write(preline)
				out.close()
				return True, "File '%s' upload success!" % fn
			else:
				out.write(preline)
				preline = line
		return False, "Unexpect Ends of data."
 

def _get_best_family(*address):
	infos = socket.getaddrinfo(
		*address,
		type=socket.SOCK_STREAM,
		flags=socket.AI_PASSIVE,
	)
	family, type, proto, canonname, sockaddr = next(iter(infos))
	return family, sockaddr




def serve_forever(port=8000, bind=None, directory="."):
	"""
	This runs an HTTP server on port 8000 (or the port argument).
	"""

	# ensure dual-stack is not disabled; ref #38907
	class DualStackServer(http.server.ThreadingHTTPServer):
		def server_bind(self):
			# suppress exception when protocol is IPv4
			with contextlib.suppress(Exception):
				self.socket.setsockopt(
					socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
			return super().server_bind()

		def finish_request(self, request, client_address):
			self.RequestHandlerClass(request, client_address, self,
										directory=directory)


	HandlerClass=MySimpleHTTPRequestHandler
	ServerClass=DualStackServer
	protocol="HTTP/1.0"
	ServerClass.address_family, addr = _get_best_family(bind, port)
	HandlerClass.protocol_version = protocol
	with ServerClass(addr, HandlerClass) as httpd:
		host, port = httpd.socket.getsockname()[:2]
		url_host = f'[{host}]' if ':' in host else host
		print(
			f"Serving HTTP on {host} port {port} "
			f"(http://{url_host}:{port}/) ..."
		)
		try:
			httpd.serve_forever()
		except KeyboardInterrupt:
			print("\nKeyboard interrupt received, exiting.")
			sys.exit(0)
 
if __name__ == '__main__':
	import argparse
	parser = argparse.ArgumentParser()
	parser.add_argument('--bind', '-b', metavar='ADDRESS',
						help='specify alternate bind address '
							 '(default: all interfaces)')
	parser.add_argument('--directory', '-d', default=os.getcwd(),
						help='specify alternate directory '
							 '(default: current directory)')
	parser.add_argument('port', action='store', default=8000, type=int,
						nargs='?',
						help='specify alternate port (default: 8000)')
	args = parser.parse_args()

	# 在主线程中执行
	# serve_forever(
	# 	port=args.port,
	# 	bind=args.bind,
	#	directory = args.directory
	# )

	# 在子线程中执行,这样主线程可以执行异步工作
	thread1 = threading.Thread(name='t1',target= serve_forever,
			kwargs={
				"port" : args.port,
				"bind" : args.bind,
				"directory" : args.directory
			}
		)
	thread1.start()
	# thread1.join()
	

	

使用帮助:

>python3 simple_http_file_server.py --help              
usage: simple_http_file_server.py [-h] [--cgi] [--bind ADDRESS] [--directory DIRECTORY] [port]

positional arguments:
  port                  specify alternate port (default: 8000)

optional arguments:
  -h, --help            show this help message and exit
  --cgi                 run as CGI server
  --bind ADDRESS, -b ADDRESS
                        specify alternate bind address (default: all interfaces)
  --directory DIRECTORY, -d DIRECTORY
                        specify alternate directory (default: current directory)

使用示例:

>python3 simple_http_file_server.py -d ~ 12345
Serving HTTP on :: port 12345 (http://[::]:12345/) ...
::ffff:127.0.0.1 - - [30/Nov/2023 09:35:06] "GET / HTTP/1.1" 200 -

。。。

访问:

浏览器中访问:http://127.0.0.1:12345/


参考:

        Python3 实现简单HTTP服务器(附带文件上传)_python3 -m http.server-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/206271.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vue中.sync修饰符与$emit(update:xxx)双向数据绑定

文章目录 一、单向数据流二、props父子传值2.1、父组件2.2、子组件2.3、优缺点2.3.1、优点2.3.2、缺点 三、.sync修饰符双向绑定3.1、父组件3.2、子组件3.3、优缺点3.3.1、优点3.3.2、缺点 3.4、[文档](https://v2.cn.vuejs.org/v2/guide/components-custom-events.html#sync-%…

JAVA全栈开发 day14_集合(Collection\List接口、数据结构、泛型)

一、数组 数组是一个容器&#xff0c;可以存入相同类型的多个数据元素。 数组局限性&#xff1a; ​ 长度固定&#xff1a;&#xff08;添加–扩容&#xff0c; 删除-缩容&#xff09; ​ 类型是一致的 对象数组 &#xff1a; int[] arr new int[5]; … Student[] arr …

ThermalLabel SDK for .NET 13.0.23.1113 Crack

ThermalLabel SDK for .NET 是一个 .NET 典型类库&#xff0c;它允许用户和开发人员创建非常创新的条码标签并将其发布在 zebra ZPL、EPL、EPSON ESC、POS 以及 Honeywell intermec 指纹中通过在 VB.NET 或 C# 上编写 .NET 纯代码来实现热敏打印机&#xff0c;以实现项目框架的…

shell编程系列(6)-使用Sort进行数据排序

文章目录 前言使用Sort命令文本排序sort命令的选项如下&#xff1a;基本排序对单个指定列进行排序对多个指定列进行排序删除重复的行 结语 前言 shell脚本通常用作一些自动化的操作&#xff0c;但是在有些场景下例如科研运算&#xff0c;有时候会产生大量的运算结果文件&#…

单片机AVR单片机病房控制系统设计+源程序

一、系统方案 设计一个可容8张床位的病房呼叫系统。要求每个床位都有一个按钮&#xff0c;当患者需要呼叫护士时&#xff0c;按下按钮&#xff0c;此时护士值班室内的呼叫系统板上显示该患者的床位号&#xff0c;并蜂鸣器报警。当护士按下“响应”键时&#xff0c;结束当前呼叫…

【idea】设置鼠标滚轮控制缩放大小

1、点击file 选择Setting 2、点击Editor 下面的 General 3、勾选 Mouse Control 下面的 Change font size with CtrlMouse Wheel in 4、点级apply 5、按 ctrl键 鼠标滚轮缩放字体的大小

python进阶技巧

1.闭包 通过函数嵌套&#xff0c;可以让内部函数依赖外部变量&#xff0c;可以避免全局变量的污染问题 闭包注意事项&#xff1a; 总结&#xff1a; 2.装饰器 2.1装饰器的一般写法 2.2 装饰器的语法糖写法 def outer(func):def inner():print(睡了)func()print(起床)retur…

java--多态

1.什么是多态 多态是在继承/实现的情况下的一种现象&#xff0c;表现为&#xff1a;对象多态、行为多态。 2.多态的具体代码体现 编译看左边&#xff0c;运行看右边 3.多态的前提 有继承/实现关系&#xff1b;存在父类引用子类对象&#xff1b;存在方法重写 4.多态的一个注…

Navicat连接Oracle数据库记录

oracle服务如图&#xff1a; navicat连接设置&#xff1a; 测试连接成功&#xff01; 连接创建成功&#xff01;

语音信号处理:librosa

1 librosa介绍 Librosa是一个用于音频和音乐分析的Python库&#xff0c;专为音乐信息检索&#xff08;Music Information Retrieval&#xff0c;MIR&#xff09;社区设计。自从2015年首次发布以来&#xff0c;Librosa已成为音频分析和处理领域中最受欢迎的工具之一。它提供了一…

python中的函数定义

默认参数 注&#xff1a; 在Python中&#xff0c;print(x, and y both correct)是一条打印语句&#xff08;print statement&#xff09;&#xff0c;用于将一条消息输出到控制台或终端。它的作用是将变量x的值和字符串and y both correct同时输出到屏幕上。 在这个语句中&…

基于AT89C51单片机的节日彩灯门设计

1&#xff0e;设计任务 本设计采用单片机为主控芯片&#xff0c;结合外围电路组成彩灯门的控制系统器&#xff0c;用来控制16个彩色的LED发光&#xff0c;实现彩色亮点的循环移动&#xff1b;通过软件编程实现各种各样的彩色亮点平面循环移动&#xff0c;该彩色控制器可以通过输…

LRU缓存淘汰策略的实现——LinkedHashMap哈希链表

LRU&#xff08;最近最少使用&#xff09;缓存淘汰策略可以通过使用哈希链表实现。LinkedHashMap 是 Java 中提供的一种数据结构&#xff0c;它综合了哈希表和双向链表的特点&#xff0c;非常适合用来实现 LRU 缓存。 LinkedHashMap 内部维护了一个哈希表和一个双向链表。哈希…

树与二叉树堆:经典OJ题集

目录 查找值为x的结点&#xff1a; 思路分析&#xff1a; 单值二叉树&#xff1a; 示例&#xff1a; 思路分析&#xff1a; 相同的树&#xff1a; 示例&#xff1a; 思路分析&#xff1a; 二叉树的前序遍历&#xff1a;——使用前序遍历把结点元素放入数组中 题…

Gartner发布降低软件供应链安全风险指南

软件供应链攻击已呈三位数增长&#xff0c;但很少有组织采取措施评估这些复杂攻击的风险。这项研究提供了安全和风险管理领导者可以用来检测和预防攻击并保护其组织的三种实践。 主要发现 尽管软件供应链攻击急剧增加&#xff0c;但安全评估并未作为供应商风险管理或采购活动的…

030 - STM32学习笔记 - ADC(四) 独立模式多通道DMA采集

030 - STM32学习笔记 - ADC&#xff08;四&#xff09; 独立模式多通道DMA采集 中断模式和DMA模式进行单通道模拟量采集&#xff0c;这节继续学习独立模式多通道DMA采集&#xff0c;使用到的引脚有之前使用的PC3&#xff08;电位器&#xff09;&#xff0c;PA4&#xff08;光敏…

js事件流与事件委托/事件代理

1 事件流 事件流分为两步&#xff0c;一是捕获&#xff0c;二是冒泡 1.1 捕获概念 捕获就是从最高层一层一层往下找到最内部的节点 1.2 冒泡概念 捕获到最小节点后&#xff0c;一层一层往上返回&#xff0c;像是气泡从最底部往上冒一样&#xff0c;由于水深不同压强不同&…

如何在工作中好好利用CHAT?

问CHAT&#xff1a;智能微网和综合能源项目实施过程中存在的管理风险和应对措施 CHAT回复&#xff1a;在智能微网和综合能源项目实施过程中&#xff0c;可能存在的管理风险和应对措施主要有以下几个方面&#xff1a; 1. 技术风险&#xff1a;所使用的技术和设备可能还处在研发…

某60区块链安全之薅羊毛攻击实战一学习记录

区块链安全 文章目录 区块链安全薅羊毛攻击实战一实验目的实验环境实验工具实验原理实验内容薅羊毛攻击实战一 实验步骤EXP利用 薅羊毛攻击实战一 实验目的 学会使用python3的web3模块 学会分析以太坊智能合约薅羊毛攻击漏洞 找到合约漏洞进行分析并形成利用 实验环境 Ubun…

[架构之路-255]:目标系统 - 设计方法 - 软件工程 - 软件设计 - 架构设计 - 软件架构风格

前言&#xff1a; 风格是指在不同领域内&#xff0c;人们在表达自己的过程中&#xff08;如艺术、音乐、文化、时尚、建筑、软件系统等&#xff09;&#xff0c;所选择的、相对稳定的表达方式和特征的总和。在不同领域内都存在着多种不同的风格。 在艺术领域内&#xff0c;也…