Flask实现高效日志记录模块

目录

一. 简介:

1. 为什么需要请求日志

二.  日志模块组成

1.  对应日志表创建(包含日志记录的关键字段)

2.  编写日志记录静态方法

3.  在Flask中捕获请求日志

4.  捕获异常并记录错误日志

5.  编写日志接口数据展示

6.  写入数据展示

三. 日志信息格式处理问题

1. 如何处理流式响应(Passthrough)

2. 如何记录响应数据(如JSON响应)

3. 总结与优化建议

四 . 结尾


一. 简介:

在Flask应用中,日志记录是重要的功能之一,它可以帮助开发人员跟踪请求的处理情况,快速定位错误,并且有助于应用的监控与调试。本文将介绍如何在Flask应用中实现请求日志记录,包括如何记录请求的各种信息(如请求数据、响应数据、错误信息等)并将其保存到数据库中。我们还会演示如何捕获不同级别的日志(信息级别、错误级别),并讨论如何处理复杂的响应数据(如流式响应)。


1. 为什么需要请求日志

日志记录可以帮助开发人员和运维团队了解应用的行为和状态,尤其是在生产环境中。通过记录每次请求的详细信息,开发人员能够:

  • 跟踪应用性能。
  • 快速定位和调试错误。
  • 为监控和安全审计提供数据支持。

例如,在出现错误时,记录详细的堆栈信息、请求的URL、请求参数和响应数据,能够帮助你迅速分析问题并进行修复。

二.  日志模块组成

  1.  对应日志表创建(包含日志记录的关键字段

# 日志表
class Log(db.Model,TimestampMixin):
    """日志表"""
    __tablename__ = 't_logs'
    __table_args__ = {
        'mysql_engine': 'InnoDB',
        'comment': '日志表'
    }

    id = db.Column(db.Integer, primary_key=True, autoincrement=True,comment='id')
    user_id = db.Column(db.Integer,comment='用户ID')  # 用户ID
    ip_address = db.Column(db.String(50),comment='ip地址')  # ip地址
    level = db.Column(db.String(50),comment='日志级别')  # 日志级别
    message = db.Column(db.Text,comment='日志内容')  # 日志内容
    module = db.Column(db.String(100),comment='模块名称')  # 模块名称
    method = db.Column(db.String(50),comment='方法名称')  # 方法名称
    url = db.Column(db.String(255),comment='请求的URL')  # 请求的URL
    request_data = db.Column(db.Text,comment='请求数据')  # 请求数据
    response_data = db.Column(db.Text,comment='响应数据')  # 响应数据
    error_code = db.Column(db.String(50),comment='错误代码')  # 错误代码
    stack_trace = db.Column(db.Text,comment='堆栈追踪')  # 堆栈追踪
    hostname = db.Column(db.String(100),comment='服务器主机名')  # 服务器主机名
    context = db.Column(db.String(255),comment='上下文信息')  # 上下文信息

    def __repr__(self):
        return f"<ErrorLog(id={self.id}, ip_address={self.ip_address}, level={self.level}, message={self.message})>"

2.  编写日志记录静态方法

    # db.session 进行数据提交等操作
    @staticmethod
    def log_message(session, exception=None, user_id=None,error_code=None,level=None, message=None, **kwargs):
        try:
            # 获取请求数据
            request_data=''
            if request.method == 'GET':
                request_data = str(dict(request.args))  # 直接获取查询参数
            elif request.method == 'POST':
                if request.is_json:
                    request_data = str(request.get_json())  # 获取 JSON 数据
                else:
                    request_data = str(request.form)  # 获取表单数据

            # 获取请求的其他信息
            ip_address = request.remote_addr  # 获取客户端IP地址
            url = request.url  # 获取请求的URL
            response_data = str(kwargs.get('response_data', ''))  # 获取响应数据
            stack_trace = traceback.format_exc() if exception else ""  # 获取堆栈追踪
            hostname = socket.gethostname()  # 获取服务器主机名
            context = kwargs.get('context', 'Production')  # 上下文(默认生产环境)

            if not message:
                message = str(exception) if exception else "Unknown error"

            # 检查错误响应数据并跳过日志记录
            try:
                # 将响应数据从字符串转换为字典
                response_dict = json.loads(response_data)
                if isinstance(response_dict, dict) and response_dict.get("message") == "内部服务器错误":
                    return  # 跳过日志记录
            except json.JSONDecodeError:
                # 如果无法解析 JSON,则跳过判断
                pass
            # print(user_id)
            # 创建并保存日志条目
            log = Log(
                user_id=user_id if user_id else 0,
                level=level,
                message=message,
                ip_address=ip_address,
                url=url,
                request_data=request_data,
                response_data=response_data,
                stack_trace=stack_trace,
                hostname=hostname,
                context=context,
                method=request.method,
                module=request.blueprint,
                error_code=error_code,
            )

            # 保存日志条目
            db.session.add(log)
            db.session.commit()

        except Exception as e:
            print(e)
            pass

3. 在Flask中捕获请求日志

在Flask中,我们可以通过使用 after_request 钩子来捕获请求信息。这个钩子在每次请求处理完毕后执行,适合用于记录日志。示例如下:

# 请求成功日志记录
@app.after_request
def after_request(response):
    # 获取当前用户信息
    user_id = None
    level = "INFO"  # 你可以根据需要动态设置日志级别,如根据响应状态码判断
    message = "请求成功!"  # 请求成功的默认信息
    # 记录日志
    try:
        current_user = get_jwt_identity()
        # 获取用户信息(从数据库中获取用户信息做对比)
        user_info = db.session.query(User).filter(User.username == current_user).first()
        user_id = user_info.id
    except RuntimeError as e:
        # 捕获没有 JWT 时抛出的 RuntimeError 异常
        # 不做任何事情,直接跳过日志记录
        pass
    except Exception as e:
        print(e)
        pass

    # 如果响应处于 passthrough 模式,则不能直接访问 response.data
    if not response.direct_passthrough:
        # 正常情况下获取响应数据并转化为文本
        response_data = response.get_data(as_text=True)
    else:
        # 如果是 passthrough 模式,说明是流式响应或直接传递模式
        response_data = "{}"  # 或者根据需求设置适当的默认值

    if user_id:
        Log.log_message(
            db.session,
            user_id=user_id if user_id else 0,
            level=level,
            message=message,
            response_data=response_data,
            context="Production",  # 可选的环境信息
            error_code=200,
        )
    return response

在这个例子中,我们通过 after_request 钩子来处理每个请求后执行的日志记录。获取响应数据时,根据响应的类型决定是否直接获取 response.data

4. 捕获异常并记录错误日志

在实际开发中,应用程序往往会遇到异常。为了保证日志的完整性,我们可以捕获异常并将其记录下来。特别是对于HTTP 500类错误,应该记录详细的堆栈信息。

# 异常处理日志记录
@app.errorhandler(Exception)
def handle_exception(e):
    # 捕获所有异常,记录日志
    level = "ERROR"
    message = str(e)  # 将异常转换为字符串
    stack_trace = traceback.format_exc()  # 获取堆栈追踪

    # 获取当前用户信息
    user_id = None
    try:
        current_user = get_jwt_identity()
        user_info = db.session.query(User).filter(User.username == current_user).first()
        user_id = user_info.id
    except RuntimeError:
        # 如果没有 JWT 则不记录用户ID
        pass
    if user_id:
        # 记录异常日志
        Log.log_message(
            db.session,
            exception=e,
            user_id=user_id,
            level=level,
            message='请求失败!',
            stack_trace=stack_trace,
            context="Production",
            error_code=500,
        )
    print(message)
    # 返回通用的500错误响应
    return {"message": "内部服务器错误"}, 500

在这个例子中,我们捕获了通用的异常,并将错误信息、堆栈追踪以及其他日志信息保存到数据库。

5. 编写日志接口数据展示

from flask import Response, jsonify, Flask, request, Blueprint,url_for
from configs import *
from modules.Tables import *
from sqlalchemy import func

# 创建蓝图,对应的register目录(激活操作视图)
log_view = Blueprint('log_view', __name__)

# 日志信息展示
@log_view.route('/log_data', methods=['GET'])
@jwt_required()
def log_data():

    # 获取分页参数
    page = request.args.get('page', default=1, type=int)  # 当前页码
    per_page = request.args.get('per_page', default=15, type=int)  # 每页显示条目数量
    level = request.args.get('level')  # 日志等级

    # 定义筛选列表
    filters = []
    # 当筛选条件存在时添加到列表
    if level:
        filters.append(Log.level == level)

    # 构建查询条件
    query = and_(*filters) if filters else True  # 如果 filters 为空,默认条件为 True


    # 查询主区域点并进行分页
    pagination = db.session.query(Log.id,
                         Log.user_id,Log.ip_address, Log.level,Log.message,Log.module,Log.method,Log.url,
                         Log.request_data, Log.response_data,Log.error_code, Log.stack_trace,Log.hostname,Log.context,Log.created_at
                 ).filter(query).paginate(page=page, per_page=per_page, error_out=False)

    # 获取分页后的数据
    data_list = [
        {
            'id': item.id,
            'user_id': item.user_id,
            'ip_address': item.ip_address,
            'level': item.level,
            'message': item.message,
            'module': item.module,
            'method': item.method,
            'url': item.url,
            'request_data': item.request_data if item.response_data else item.stack_trace,
            'response_data': item.response_data,
            'error_code': item.error_code,
            # 'stack_trace':item.stack_trace,
            'hostname':item.hostname,
            'context':item.context,
            'created_at': format_datetime(item.created_at),

        }
        for item in pagination.items
    ]


    # 构造返回结果,包括分页信息
    response = {
        'code': 200,
        'data': data_list,
        'pagination': {
            'current_page': pagination.page,
            'total_pages': pagination.pages,
            'total_items': pagination.total,
            'per_page': pagination.per_page
        }
    }

    return jsonify({'code':200,'data':response})


 6. 写入数据展示

三. 日志信息格式处理问题

1. 如何处理流式响应(Passthrough)

Flask中的流式响应(passthrough)不允许直接访问 response.data,因此需要特别处理。通过 response.get_data(as_text=True) 方法,我们可以安全地获取响应数据并将其存储到日志中。如果响应不可序列化(如流式数据),可以跳过或记录默认值。

if not response.direct_passthrough:
    response_data = response.get_data(as_text=True)
else:
    response_data = "Non-serializable response"

2. 如何记录响应数据(如JSON响应)

对于返回JSON数据的响应,我们可以通过 response.get_data(as_text=True) 获取响应体的内容。这种方式适用于大多数JSON响应,确保我们可以将响应内容记录到日志中。

例如,在捕获日志时,我们可以如下处理响应数据:

response_data = response.get_data(as_text=True)

对于非JSON响应,使用 str() 也能保证将其转化为可记录的格式。

3. 总结与优化建议

  • 在Flask应用中,通过 @app.after_request 钩子可以轻松地记录每次请求的日志。
  • 根据不同的请求方法(GET/POST)和响应类型(JSON、流式数据等),动态调整日志记录方式。
  • 使用 try-except 语句捕获异常,并记录详细的错误信息以帮助后期的调试和维护。

通过这些方法,我们可以高效、全面地记录Flask应用中的日志,并为后期的性能优化、故障排查提供支持。

四 . 结尾

日志记录是开发和运维过程中不可或缺的一部分,它能帮助我们及时发现问题并做出调整。通过本文介绍的日志记录方式,我们能够方便地捕获请求和响应的详细信息,同时处理各种异常和特殊情况。希望这篇文章能够帮助你更好地理解并实现Flask中的日志记录。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/972884.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【学习笔记】Cadence电子设计全流程(一)Cadence 生态及相关概念

【学习笔记】Cadence电子设计全流程&#xff08;一&#xff09;Cadence 生态及相关概念 1.1 Cadence 生态系统及各模块关系1.2 Cadence相较于Altium Designer在硬件设计中的优势 1.1 Cadence 生态系统及各模块关系 Cadence 提供了一套完整的电子设计自动化 (EDA) 工具链&#…

【Linux Redis】关于用docker拉取Redis后,让虚拟机运行起来redis,并使得其可以连接到虚拟机外的navicat。

步骤一&#xff1a;拉取Redis镜像 docker pull redis 这个命令会下载最新版本的Redis镜像到你的本地Docker仓库中。你也可以指定一个具体的版本号&#xff0c;例如docker pull redis:6.2.6&#xff0c;来拉取特定版本的Redis镜像。 如果拉取遇到问题请参考【Linux AnolisOS】关…

蓝桥与力扣刷题(蓝桥 裁纸刀)

本题为填空题&#xff0c;只需要算出结果后&#xff0c;在代码中使用输出语句将所填结果输出即可。 题目&#xff1a;小蓝有一个裁纸刀&#xff0c;每次可以将一张纸沿一条直线裁成两半。 小蓝用一张纸打印出两行三列共 6 个二维码&#xff0c;至少使用九次裁出来&#xff0c…

pdf转换成word在线 简单好用 支持批量转换 效率高 100%还原

pdf转换成word在线 简单好用 支持批量转换 效率高 100%还原 在数字化办公的浪潮中&#xff0c;文档格式转换常常让人头疼不已&#xff0c;尤其是 PDF 转 Word 的需求极为常见。PDF 格式虽然方便阅读和传输&#xff0c;但难以编辑&#xff0c;而 Word 格式却能灵活地进行内容修…

Django ModelForm使用(初学)

1.目的是根据员工表字段&#xff0c;实现一个新增员工的数据填写页面 2.在views.py文件中按下面的格式写 定义 ModelForm 类&#xff1a;UserModelForm &#xff08;自己命名的类名&#xff09;使用时需要导入包 定义视图函数&#xff1a;user_model_form_add&#xff08;在函…

基于大牛直播SDK的Android平台低延迟RTSP|RTMP播放与录像技术实践

技术背景 随着直播、安防监控、远程会议等场景对实时性与稳定性要求的提升&#xff0c;低延迟流媒体播放与录像成为核心技术需求。大牛直播SDK的SmartPlayer模块提供了完整的解决方案&#xff0c;支持RTSP、RTMP协议的多实例播放、硬件解码、实时快照、录像管理等功能&#xf…

小怿学习日记(七) | Unreal引擎灯光架构

灯光的布局对于HMI场景中车模的展示效果有着举足轻重的地位。本篇内容将简单介绍ES3.1的相关知识&#xff0c;再深入了解Unreal引擎中车模的灯光以及灯光架构。 一、关于ES3.1 1.1 什么是ES3.1 ES3.1这个概念对于美术的同学可能比较陌生&#xff0c;ES3.1指的是OpenGL ES3.1&…

DeepSeek 接入PyCharm实现AI编程!(支持本地部署DeepSeek及官方DeepSeek接入)

前言 在当今数字化时代&#xff0c;AI编程助手已成为提升开发效率的利器。DeepSeek作为一款强大的AI模型&#xff0c;凭借其出色的性能和开源免费的优势&#xff0c;成为许多开发者的首选。今天&#xff0c;就让我们一起探索如何将DeepSeek接入PyCharm&#xff0c;实现高效、智…

广度优先搜索详解--BFS--蒟蒻的学习之路

1.什么是广度优先搜索? 广度优先搜索&#xff08;Breadth-First Search&#xff0c;简称BFS&#xff09;是一种遍历或搜索树和图的算法&#xff0c;也称为宽度优先搜索&#xff0c;BFS算法从图的某个节点开始&#xff0c;依次对其所有相邻节点进行探索和遍历&#xff0c;然后再…

. Unable to find a @SpringBootConfiguration(默认软件包中的 Spring Boot 应用程序)

解决&#xff1a; 新建一个包即可 问题&#xff1a; 默认软件包中的 Spring Boot 应用程序。 原因&#xff1a; 默认包的定义 &#xff1a; 如果一个 Java 类没有使用 package 声明包名&#xff0c;则该类会被放置在默认包中。Spring Boot 遵循 Java 的包管理约定&#xff…

DeepSeek企业级部署实战指南:从服务器选型到Dify私有化落地

对于个人开发者或尝鲜者而言&#xff0c;本地想要部署 DeepSeek 有很多种方案&#xff0c;但是一旦涉及到企业级部署&#xff0c;则步骤将会繁琐很多。 比如我们的第一步就需要先根据实际业务场景评估出我们到底需要部署什么规格的模型&#xff0c;以及我们所要部署的模型&…

“三次握手”与“四次挥手”:TCP传输控制协议连接过程

目录 什么是TCP协议 “三次握手”建立连接 “四次挥手”断开连接 “三次握手”和“四次挥手”的反思 总结 什么是TCP协议 想象一下&#xff0c;你和远方的朋友要进行一场电话交流&#xff0c;但这通电话不仅仅是随便聊聊&#xff0c;而是要传递一封重要的信件。为了确保这…

网络运维学习笔记 012网工初级(HCIA-Datacom与CCNA-EI)某机构新增:GRE隧道与EBGP实施

文章目录 GRE隧道&#xff08;通用路由封装&#xff0c;Generic Routing Encapsulation&#xff09;协议号47实验&#xff1a;思科&#xff1a;开始实施&#xff1a; 华为&#xff1a;开始实施&#xff1a; eBGP实施思科&#xff1a;华为&#xff1a; GRE隧道&#xff08;通用路…

Android 动态加入Activity 时 manifest 注册报错解决。使用manifestPlaceholders 占位

需求如下&#xff1a; 项目 测试demo 有多个渠道&#xff0c;部分渠道包含支付功能&#xff0c;在主测试代码外&#xff0c;需要一个单独 Activity 调用测试代码。 MainActivityPayActivity渠道A包含不包含渠道B包含包含 因为支付功能需要引入对应的 moudule&#xff0c;因此…

【koa】05-koa+mysql实现数据库集成:连接和增删改查

前言 前面我们已经介绍了第二阶段的第1-4点内容&#xff0c;本篇介绍第5点内容&#xff1a;数据库集成&#xff08;koamysql&#xff09; 也是第二阶段内容的完结。 一、学习目标 在koa项目中正常连接数据库&#xff0c;对数据表进行增删改查的操作。 二、操作步骤 本篇文章…

linux--关于makefile

makefile文件 可以指定编译顺序&#xff0c;这样方便一个项目的多个文件要编译的挨个操作的麻烦。 makefile文件的命名&#xff1a;makefile 或者 Makefile 必须是这俩&#xff0c;系统才能识别 规则的书写语法如下&#xff1a; 一个makefile内可以有多个规则 目标:依赖a 依…

俄罗斯方块游戏完整代码示例

以下是一个基于Cocos Creator引擎开发的俄罗斯方块游戏的完整代码示例。该游戏实现了俄罗斯方块的基本功能&#xff0c;并且代码整合在单个文件中&#xff0c;无需任何外部依赖&#xff0c;可以直接在浏览器中运行。 1. 创建Cocos Creator项目 首先&#xff0c;确保你已经安装了…

学习kafka和flink

kafka kafka安装一套流程 方法一&#xff1a;启动需安装zookeeper和kafka 【Kafka】Windows下安装Kafka&#xff08;图文记录详细步骤&#xff09; 安装Tzq2018写的上面链接安装的&#xff0c;一切很顺利&#xff0c;除了zookeeper的环境变量不管如何配置都不管用&#xff0…

SLT-加载表添加字段重新刷数

1、LTRC数据提供->输入表名->停止加载/复制 2、LTRS添加表字段&#xff08;只有在加载部分字段的情况下&#xff09;&#xff1b; 在查看修改概览页将需要的字段选中并删除&#xff0c;删除的字段自动归集到已修改概览里。 3、数据提供-》输入表名-》创建/数据库视图&am…

【黑马点评优化】2-Canel实现多级缓存(Redis+Caffeine)同步

【黑马点评优化】2-Canel实现多级缓存&#xff08;RedisCaffeine&#xff09;同步 0 背景1 配置MySQL1.1 开启MySQL的binlog功能1.1.1 找到mysql配置文件my.ini的位置1.1.2 开启binlog 1.2 创建canal用户 2 下载配置canal2.1 canal 1.1.5下载2.2 配置canal2.3 启动canal2.4 测试…