目录
一. 简介:
1. 为什么需要请求日志
二. 日志模块组成
1. 对应日志表创建(包含日志记录的关键字段)
2. 编写日志记录静态方法
3. 在Flask中捕获请求日志
4. 捕获异常并记录错误日志
5. 编写日志接口数据展示
6. 写入数据展示
三. 日志信息格式处理问题
1. 如何处理流式响应(Passthrough)
2. 如何记录响应数据(如JSON响应)
3. 总结与优化建议
四 . 结尾
一. 简介:
在Flask应用中,日志记录是重要的功能之一,它可以帮助开发人员跟踪请求的处理情况,快速定位错误,并且有助于应用的监控与调试。本文将介绍如何在Flask应用中实现请求日志记录,包括如何记录请求的各种信息(如请求数据、响应数据、错误信息等)并将其保存到数据库中。我们还会演示如何捕获不同级别的日志(信息级别、错误级别),并讨论如何处理复杂的响应数据(如流式响应)。
1. 为什么需要请求日志
日志记录可以帮助开发人员和运维团队了解应用的行为和状态,尤其是在生产环境中。通过记录每次请求的详细信息,开发人员能够:
- 跟踪应用性能。
- 快速定位和调试错误。
- 为监控和安全审计提供数据支持。
例如,在出现错误时,记录详细的堆栈信息、请求的URL、请求参数和响应数据,能够帮助你迅速分析问题并进行修复。
二. 日志模块组成
1. 对应日志表创建(包含日志记录的关键字段)
# 日志表
class Log(db.Model,TimestampMixin):
"""日志表"""
__tablename__ = 't_logs'
__table_args__ = {
'mysql_engine': 'InnoDB',
'comment': '日志表'
}
id = db.Column(db.Integer, primary_key=True, autoincrement=True,comment='id')
user_id = db.Column(db.Integer,comment='用户ID') # 用户ID
ip_address = db.Column(db.String(50),comment='ip地址') # ip地址
level = db.Column(db.String(50),comment='日志级别') # 日志级别
message = db.Column(db.Text,comment='日志内容') # 日志内容
module = db.Column(db.String(100),comment='模块名称') # 模块名称
method = db.Column(db.String(50),comment='方法名称') # 方法名称
url = db.Column(db.String(255),comment='请求的URL') # 请求的URL
request_data = db.Column(db.Text,comment='请求数据') # 请求数据
response_data = db.Column(db.Text,comment='响应数据') # 响应数据
error_code = db.Column(db.String(50),comment='错误代码') # 错误代码
stack_trace = db.Column(db.Text,comment='堆栈追踪') # 堆栈追踪
hostname = db.Column(db.String(100),comment='服务器主机名') # 服务器主机名
context = db.Column(db.String(255),comment='上下文信息') # 上下文信息
def __repr__(self):
return f"<ErrorLog(id={self.id}, ip_address={self.ip_address}, level={self.level}, message={self.message})>"
2. 编写日志记录静态方法
# db.session 进行数据提交等操作
@staticmethod
def log_message(session, exception=None, user_id=None,error_code=None,level=None, message=None, **kwargs):
try:
# 获取请求数据
request_data=''
if request.method == 'GET':
request_data = str(dict(request.args)) # 直接获取查询参数
elif request.method == 'POST':
if request.is_json:
request_data = str(request.get_json()) # 获取 JSON 数据
else:
request_data = str(request.form) # 获取表单数据
# 获取请求的其他信息
ip_address = request.remote_addr # 获取客户端IP地址
url = request.url # 获取请求的URL
response_data = str(kwargs.get('response_data', '')) # 获取响应数据
stack_trace = traceback.format_exc() if exception else "" # 获取堆栈追踪
hostname = socket.gethostname() # 获取服务器主机名
context = kwargs.get('context', 'Production') # 上下文(默认生产环境)
if not message:
message = str(exception) if exception else "Unknown error"
# 检查错误响应数据并跳过日志记录
try:
# 将响应数据从字符串转换为字典
response_dict = json.loads(response_data)
if isinstance(response_dict, dict) and response_dict.get("message") == "内部服务器错误":
return # 跳过日志记录
except json.JSONDecodeError:
# 如果无法解析 JSON,则跳过判断
pass
# print(user_id)
# 创建并保存日志条目
log = Log(
user_id=user_id if user_id else 0,
level=level,
message=message,
ip_address=ip_address,
url=url,
request_data=request_data,
response_data=response_data,
stack_trace=stack_trace,
hostname=hostname,
context=context,
method=request.method,
module=request.blueprint,
error_code=error_code,
)
# 保存日志条目
db.session.add(log)
db.session.commit()
except Exception as e:
print(e)
pass
3. 在Flask中捕获请求日志
在Flask中,我们可以通过使用 after_request
钩子来捕获请求信息。这个钩子在每次请求处理完毕后执行,适合用于记录日志。示例如下:
# 请求成功日志记录
@app.after_request
def after_request(response):
# 获取当前用户信息
user_id = None
level = "INFO" # 你可以根据需要动态设置日志级别,如根据响应状态码判断
message = "请求成功!" # 请求成功的默认信息
# 记录日志
try:
current_user = get_jwt_identity()
# 获取用户信息(从数据库中获取用户信息做对比)
user_info = db.session.query(User).filter(User.username == current_user).first()
user_id = user_info.id
except RuntimeError as e:
# 捕获没有 JWT 时抛出的 RuntimeError 异常
# 不做任何事情,直接跳过日志记录
pass
except Exception as e:
print(e)
pass
# 如果响应处于 passthrough 模式,则不能直接访问 response.data
if not response.direct_passthrough:
# 正常情况下获取响应数据并转化为文本
response_data = response.get_data(as_text=True)
else:
# 如果是 passthrough 模式,说明是流式响应或直接传递模式
response_data = "{}" # 或者根据需求设置适当的默认值
if user_id:
Log.log_message(
db.session,
user_id=user_id if user_id else 0,
level=level,
message=message,
response_data=response_data,
context="Production", # 可选的环境信息
error_code=200,
)
return response
在这个例子中,我们通过 after_request
钩子来处理每个请求后执行的日志记录。获取响应数据时,根据响应的类型决定是否直接获取 response.data
。
4. 捕获异常并记录错误日志
在实际开发中,应用程序往往会遇到异常。为了保证日志的完整性,我们可以捕获异常并将其记录下来。特别是对于HTTP 500类错误,应该记录详细的堆栈信息。
# 异常处理日志记录
@app.errorhandler(Exception)
def handle_exception(e):
# 捕获所有异常,记录日志
level = "ERROR"
message = str(e) # 将异常转换为字符串
stack_trace = traceback.format_exc() # 获取堆栈追踪
# 获取当前用户信息
user_id = None
try:
current_user = get_jwt_identity()
user_info = db.session.query(User).filter(User.username == current_user).first()
user_id = user_info.id
except RuntimeError:
# 如果没有 JWT 则不记录用户ID
pass
if user_id:
# 记录异常日志
Log.log_message(
db.session,
exception=e,
user_id=user_id,
level=level,
message='请求失败!',
stack_trace=stack_trace,
context="Production",
error_code=500,
)
print(message)
# 返回通用的500错误响应
return {"message": "内部服务器错误"}, 500
在这个例子中,我们捕获了通用的异常,并将错误信息、堆栈追踪以及其他日志信息保存到数据库。
5. 编写日志接口数据展示
from flask import Response, jsonify, Flask, request, Blueprint,url_for
from configs import *
from modules.Tables import *
from sqlalchemy import func
# 创建蓝图,对应的register目录(激活操作视图)
log_view = Blueprint('log_view', __name__)
# 日志信息展示
@log_view.route('/log_data', methods=['GET'])
@jwt_required()
def log_data():
# 获取分页参数
page = request.args.get('page', default=1, type=int) # 当前页码
per_page = request.args.get('per_page', default=15, type=int) # 每页显示条目数量
level = request.args.get('level') # 日志等级
# 定义筛选列表
filters = []
# 当筛选条件存在时添加到列表
if level:
filters.append(Log.level == level)
# 构建查询条件
query = and_(*filters) if filters else True # 如果 filters 为空,默认条件为 True
# 查询主区域点并进行分页
pagination = db.session.query(Log.id,
Log.user_id,Log.ip_address, Log.level,Log.message,Log.module,Log.method,Log.url,
Log.request_data, Log.response_data,Log.error_code, Log.stack_trace,Log.hostname,Log.context,Log.created_at
).filter(query).paginate(page=page, per_page=per_page, error_out=False)
# 获取分页后的数据
data_list = [
{
'id': item.id,
'user_id': item.user_id,
'ip_address': item.ip_address,
'level': item.level,
'message': item.message,
'module': item.module,
'method': item.method,
'url': item.url,
'request_data': item.request_data if item.response_data else item.stack_trace,
'response_data': item.response_data,
'error_code': item.error_code,
# 'stack_trace':item.stack_trace,
'hostname':item.hostname,
'context':item.context,
'created_at': format_datetime(item.created_at),
}
for item in pagination.items
]
# 构造返回结果,包括分页信息
response = {
'code': 200,
'data': data_list,
'pagination': {
'current_page': pagination.page,
'total_pages': pagination.pages,
'total_items': pagination.total,
'per_page': pagination.per_page
}
}
return jsonify({'code':200,'data':response})
6. 写入数据展示
三. 日志信息格式处理问题
1. 如何处理流式响应(Passthrough)
Flask中的流式响应(passthrough)不允许直接访问 response.data
,因此需要特别处理。通过 response.get_data(as_text=True)
方法,我们可以安全地获取响应数据并将其存储到日志中。如果响应不可序列化(如流式数据),可以跳过或记录默认值。
if not response.direct_passthrough:
response_data = response.get_data(as_text=True)
else:
response_data = "Non-serializable response"
2. 如何记录响应数据(如JSON响应)
对于返回JSON数据的响应,我们可以通过 response.get_data(as_text=True)
获取响应体的内容。这种方式适用于大多数JSON响应,确保我们可以将响应内容记录到日志中。
例如,在捕获日志时,我们可以如下处理响应数据:
response_data = response.get_data(as_text=True)
对于非JSON响应,使用 str()
也能保证将其转化为可记录的格式。
3. 总结与优化建议
- 在Flask应用中,通过
@app.after_request
钩子可以轻松地记录每次请求的日志。 - 根据不同的请求方法(GET/POST)和响应类型(JSON、流式数据等),动态调整日志记录方式。
- 使用
try-except
语句捕获异常,并记录详细的错误信息以帮助后期的调试和维护。
通过这些方法,我们可以高效、全面地记录Flask应用中的日志,并为后期的性能优化、故障排查提供支持。
四 . 结尾
日志记录是开发和运维过程中不可或缺的一部分,它能帮助我们及时发现问题并做出调整。通过本文介绍的日志记录方式,我们能够方便地捕获请求和响应的详细信息,同时处理各种异常和特殊情况。希望这篇文章能够帮助你更好地理解并实现Flask中的日志记录。