【Python】Locust持续优化:InfluxDB与Grafana实现数据持久化与可视化分析

目录

前言

influxDB

安装运行InfluxDB

用Python 上报数据到influxdb

ocust 数据写入到 influx

Locust的生命周期

上报数据

优化升级

配置Grafana

总结

 资料获取方法


前言

在进行性能测试时,我们需要对测试结果进行监控和分析,以便于及时发现问题并进行优化。

Locust在内存中维护了一个时间序列数据结构,用于存储每个事件的统计信息。 这个数据结构允许我们在Charts标签页中查看不同时间点的性能指标,但是正因为Locust WebUI上展示的数据实际上是存储在内存中的。所以在Locust测试结束后,这些数据将不再可用。 如果我们需要长期保存以便后续分析测试数据,可以考虑将Locust的测试数据上报到外部的数据存储系统,如InfluxDB,并使用Grafana等可视化工具进行展示和分析。

本文将介绍如何使用Locust进行负载测试,并将测试数据上报到InfluxDB。同时,我们将使用Grafana对测试数据进行展示和分析。

最终效果:

image

influxDB

InfluxDB是一款开源的时间序列数据库,专为处理大量的时间序列数据而设计。时间序列数据通常是按照时间顺序存储的数据点,每个数据点都包含一个时间戳和一个或多个与之相关的值。这种数据类型在许多场景下都非常常见,如监控系统、物联网设备、金融市场数据等。在这些场景下,数据上报是一种关键的需求,因为它可以帮助我们实时了解系统的状态和性能。

注: InfluxDB 开源的时单机版本,集群版本并未开元,但是对于普通用户的日常场景已经完全够用。

以下是关于InfluxDB的关键特性和优势的表格:

特性优势
高性能针对时间序列数据进行了优化,可以快速地写入和查询大量数据。
数据压缩使用了高效的数据压缩算法,在磁盘上节省大量空间。
自带查询语言具有一种名为InfluxQL的查询语言,类似于SQL,便于查询和分析数据。
数据保留策略支持设置数据保留策略,可以自动清除过期的数据。
易于集成具有丰富的API和客户端库,可以轻松地与其他系统和工具集成。

安装运行InfluxDB

如果你已经安装了Docker,可以直接使用官方的InfluxDB镜像来运行InfluxDB:

docker run -p 8086:8086 -v $PWD:/var/lib/influxdb influxdb:1.8

此命令将在Docker容器中启动InfluxDB,并将主机上的8086端口映射到容器的8086端口。

点击查看在如何在不同操作系统中如何安装 InfluxDB

用Python 上报数据到influxdb

首先,确保已经安装了influxdb库:

pip install influxdb

然后,使用以下代码上报数据到InfluxDB:
以下是一个使用Python操作InfluxDB上报数据的示例,对照MySQL进行注释:

import time
from influxdb import InfluxDBClient

# 连接到InfluxDB(类似于连接到MySQL数据库)
client = InfluxDBClient(host='localhost', port=8086)

# 创建数据库(类似于在MySQL中创建一个新的数据库)
client.create_database('mydb')

# 切换到创建的数据库(类似于在MySQL中选择一个数据库)
client.switch_database('mydb')

# 上报数据(类似于在MySQL中插入一条记录)
data = [
    {
        # 在InfluxDB中,measurement相当于MySQL中的表名
        "measurement": "cpu_load",
        # tags相当于MySQL中的索引列,用于快速查询
        "tags": {
            "host": "server01",
            "region": "us-west"
        },
        # time为时间戳,是InfluxDB中的关键字段
        "time": int(time.time_ns()),
        # fields相当于MySQL中的数据列,用于存储实际的数据值
        "fields": {
            "value": 0.64
        }
    }
]

# 写入数据(类似于在MySQL中执行INSERT语句)
client.write_points(data)

在这个示例中,我们首先连接到InfluxDB(类似于连接到MySQL数据库),然后创建一个名为mydb的数据库(类似于在MySQL中创建一个新的数据库),并切换到创建的数据库(类似于在MySQL中选择一个数据库)。接着,我们准备了一条名为cpu_load的数据(在InfluxDB中,measurement相当于MySQL中的表名),并为数据添加了hostregion标签(类似于MySQL中的索引列)。最后,我们将数据写入到InfluxDB中(类似于在MySQL中执行INSERT语句)。

执行上面的代码后我们可以看到我们的操作成功了:

image

如果我们安装了influx-cli就可以在命令行中直接查询刚才写入的数据:

bingohe@MacBook-Pro ~ $ /usr/local/Cellar/influxdb@1/1.11.1/bin/influx 
Connected to http://localhost:8086 version 1.8.10
InfluxDB shell version: 1.11.1
> show databases;
name: databases
name
----
_internal
mydb
> use mydb
Using database mydb
> show measurements;
name: measurements
name
----
cpu_load
> select * from cpu_load;
name: cpu_load
time                host     region  value
----                ----     ------  -----
1688874870046897000 server01 us-west 0.64

image

点击查看如何使用命令行访问InfluxDB

image

ocust 数据写入到 influx

在 【Python】万字长文,Locust 性能测试指北(上) 中我们提到过Locust的生命周期,我们也通过Locust生命周期实现了集合点的功能。现在我们一起来通过它实现测试数据的实时展示。

Locust的生命周期

点击查看Locust的生命周期

上报数据

我们先来看看常用的事件里面可以获取到的数据:

import time
from locust import HttpUser, task, between, events


@events.request.add_listener
def request_handler(*args, **kwargs):
    print(f"request args: {args}")
    print(f"request kwargs: {kwargs}")


@events.worker_report.add_listener
def worker_report_handlers(*args, **kwargs):
    print(f"worker_report args: {args}")
    print(f"worker_report kwargs: {kwargs}")


@events.test_start.add_listener
def test_start_handlers(*args, **kwargs):
    print(f"test_start args: {args}")
    print(f"test_start kwargs: {kwargs}")


@events.test_stop.add_listener
def test_stop_handlers(*args, **kwargs):
    print(f"test_stop args: {args}")
    print(f"test_stop kwargs: {kwargs}")


class QuickstartUser(HttpUser):
    wait_time = between(1, 2)

    @task
    def root(self):
        with self.client.get("/", json={"time": time.time()}, catch_response=True) as rsp:
            rsp_json = rsp.json()
            if rsp_json["id"] != 5:
                # 失败时上报返回的数据
                rsp.failure(f"{rsp_json}")

运行一次测试时能看到这些生命周期内的Locust 对外暴露的数据:

test_start args: ()
test_start kwargs: {'environment': <locust.env.Environment object at 0x10c426c70>}
request args: ()
request kwargs: {'request_type': 'GET', 'response_time': 2.6886250000011103, 'name': '/', 'context': {}, 'response': <Response [200]>, 'exception': None, 'start_time': 1688888321.896039, 'url': 'http://0.0.0.0:10000/', 'response_length': 8}
request args: ()
request kwargs: {'request_type': 'GET', 'response_time': 2.735957999998817, 'name': '/', 'context': {}, 'response': <Response [200]>, 'exception': CatchResponseError("{'id': 6}"), 'start_time': 1688888323.421389, 'url': 'http://0.0.0.0:10000/', 'response_length': 8}
test_stopping args: ()
test_stopping kwargs: {'environment': <locust.env.Environment object at 0x10c426c70>}
test_stop args: ()
test_stop kwargs: {'environment': <locust.env.Environment object at 0x10c426c70>}

从上面的监控我们可以看到,每次任务启动和停止的时候会分别调用@events.test_start.add_listener@events.test_stop.add_listener装饰的函数,每次请求发生的的时候都会调用@events.request.add_listener 监听器装饰的函数,我们就是要利用这一点来进行数据的上报。

通过查看 Locust 的 EventHook 源码注释我们可以看到标准的使用方法:

#.../site-packages/locust/event.py
...
class EventHook:
    """
    Simple event class used to provide hooks for different types of events in Locust.

    Here's how to use the EventHook class::

        my_event = EventHook()
        def on_my_event(a, b, **kw):
            print("Event was fired with arguments: %s, %s" % (a, b))
        my_event.add_listener(on_my_event)
        my_event.fire(a="foo", b="bar")

    If reverse is True, then the handlers will run in the reverse order
    that they were inserted
    """
...

结合前面的写数据到 influxDB的实现,上报数据这一项一下子就变简单了:

简单实现每次请求数据上报 到 influxDB

下面的代码运行Locust测试后会自动创建一个locust_requests的 measurement,然后将每次请求的数据上报。

运行方法可以参考我的上一篇文章

import time
from datetime import datetime
from influxdb import InfluxDBClient

from locust import HttpUser, task, between, events

client = InfluxDBClient(host='localhost', port=8086, database="mydb")

def request(request_type, name, response_time, response_length, response, context, exception, url, start_time):
    _time = datetime.utcnow()
    was_successful = True
    if response:
        was_successful = 199 < response.status_code < 400
    tags = {
        'request_type': request_type,
        'name': name,
        'success': was_successful,
        'exception': str(exception),
    }
    fields = {
        'response_time': response_time,
        'response_length': response_length,
    }
    data = {"measurement": 'locust_requests', "tags": tags, "time": _time, "fields": fields}
    client.write_points([data])

# 在每次请求的时候通过前面定义的request函数写数据到 DB
events.request.add_listener(request)


class QuickstartUser(HttpUser):
    wait_time = between(1, 2)

    @task
    def root(self):
        with self.client.get("/", json={"time": time.time()}, catch_response=True) as rsp:
            rsp_json = rsp.json()
            if rsp_json["id"] != 5:
                rsp.failure(f"{rsp_json}")

上报的数据 influxDB 中查询到:

image

优化升级

上面的这个上报很粗糙,每次请求会上报一次数据,会影响实际的压测,如果我们将要上报的数据放在一个数据结构中中,异步的上报这个数据将极大的提升性能

# 将 __flush_points 方法中的写入操作放到一个单独的线程中,避免阻塞主线程,提高性能。
self.write_thread = threading.Thread(target=self.__write_points_worker)

# 批量写入
if len(self.write_batch) >= self.batch_size or time.time() - self.last_flush_time >= self.interval_ms / 1000:

# 使用 gzip 压缩上报的数据
influxdb_writer = InfluxDBWriter('localhost', 8086, 'mydb', batch_size=1000, gzip_enabled=True)
...

配置Grafana

在测试数据被上报到InfluxDB之后,可以通过Grafana进行数据展示和分析。需要先在Grafana中配置InfluxDB数据源,然后创建相应的图表和仪表盘。

在创建图表和仪表盘时,可以选择InfluxDB作为数据源,并使用InfluxQL查询语言进行数据查询和过滤。可以根据需要选择不同的图表类型和显示方式,以展示测试结果数据的趋势和变化。

总结

本文介绍了如何将Locust测试数据上报到InfluxDB,并通过Grafana进行展示和分析。通过将测试数据与监控工具相结合,可以更好地了解系统的性能和稳定性,及时发现问题并进行优化,也可以方便后续进行测试数据分析。希望本文能对大家有所帮助。


 资料获取方法

【留言777】

各位想获取源码等教程资料的朋友请点赞 + 评论 + 收藏,三连!

三连之后我会在评论区挨个私信发给你们~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/63977.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

flutter-GridView使用

先看效果 代码实现 import package:app/common/util/k_log_util.dart; import package:app/gen/assets.gen.dart; import package:app/pages/widget/top_appbar.dart; import package:flutter/cupertino.dart; import package:flutter/material.dart; import package:flutter_…

使用idea实现git操作大全(在项目开发中遇到的实际情况

使用idea实现git操作大全&#xff08;在项目开发中遇到的实际情况&#xff09; 1.安装git插件2.在开发中切记拉一个自己的分支 1.安装git插件 2.在开发中切记拉一个自己的分支 选中需要拉的分支&#xff0c;右键该分支&#xff0c;选中new breach from “分支”&#xff0c;点…

《数据中台实践指南(1.0 版)》发布,大数据技术标准推进委员会、联合行业专家、头部企业共同编制

导读 大数据技术标准推进委员会牵头&#xff0c;联合行业专家和头部企业共同编制《数据中台实践指南&#xff08;1.0 版&#xff09;》&#xff0c;梳理数据中台历史及概念&#xff0c;明确数据中台的核心能力&#xff0c;总结数据中台建设的前提条件和不同形态&#xff0c;给…

csdn崩溃了?每次都卡

反馈给了官方客服也没有响应&#xff0c;最近几周都是这样的高频率的转圈圈&#xff01;这个入口不受重视&#xff1f;这个对于csdn用户来说&#xff0c;是最最基本的入口 如果CSDN&#xff08;CSDN.net&#xff09;崩溃了&#xff0c;可能会对以下方面产生影响&#xff1a; 开…

Docker Compose构建lnmp

目录 Compose的优点 编排和部署 Compose原理 Compose应用案例 安装docker-ce 阿里云镜像加速器 安装docker-compose docker-compose用法 Yaml简介 验证LNMP环境 Compose的优点 先来了解一下我们平时是怎么样使用docker的&#xff1f;把它进行拆分一下&#xff1a; 1…

任务14、无缝衔接,MidJourney瓷砖(Tile)参数制作精良贴图

14.1 任务概述 在这个实验任务中,我们将深入探索《Midjourney Ai绘画》中的Tile技术和其在艺术创作中的具有挑战性的应用。此任务将通过理论学习与实践操作相结合的方式,让参与者更好地理解Tile的核心概念,熟练掌握如何在Midjourney平台上使用Tile参数,并实际运用到AI绘画…

运维作业—5

一.基于 CentOS 7 构建 LVS-DR 群集 1.配置LVS 2.第一台real server&#xff08;192.168.100.139:80&#xff09; 手工在RS端绑定VIP 手工在RS端抑制ARP响应 3.第二台real server&#xff08;192.168.100.140:80&#xff09; 安装arptables并启动 使用arptables实现抑制 测试…

解决Vs Code工具开发时 保存React文件时出现乱码情况

Vs Code工具开发时 保存React文件时出现乱码情况 插件库搜索:JS-CSS-HTML Formatter 把这个插件禁用或者卸载就解决保存时出现乱码的问题了; 如果没有解决,再看下面方案! 出现乱码问题通常是因为文件的编码格式不正确。您可以尝试以下解决方法&#xff1a; 确认文件编码格式&a…

以太网帧格式与吞吐量计算

以太网帧结构 帧大小的定义 以太网单个最大帧 6&#xff08;目的MAC地址&#xff09; 6&#xff08;源MAC地址&#xff09; 2&#xff08;帧类型&#xff09; 1500{IP数据包[IP头&#xff08;20&#xff09;DATA&#xff08;1480&#xff09;]} 4&#xff08;CRC校验&#xff…

MySQL(1)

MySQL创建数据库和创建数据表 创建数据库 1. 连接 MySQL mysql -u root -p 2. 查看当前的数据库 show databases; 3. 创建数据库 create database 数据库名; 创建数据库 4. 创建数据库时设置字符编码 create database 数据库名 character set utf8; 5. 查看和显示…

目前Java后端就业前景怎么样?

前言 并不乐观&#xff0c;看看现在的就业形式就知道了&#xff0c;基本上是僧多粥少的情况&#xff0c;你可能会看到很多编程语言排行榜或者流行榜中Java的排名很高&#xff0c;如同下面这种&#xff1a; 看排名确实可以粗略的得知语言当下的流行度、使用率&#xff0c;但是它…

Gof23设计模式之享元模式

1.定义 运用共享技术来有效地支持大量细粒度对象的复用。它通过共享已经存在的对象来大幅度减少需要创建的对象数量、避免大量相似对象的开销&#xff0c;从而提高系统资源的利用率。 2.结构 享元&#xff08;Flyweight &#xff09;模式中存在以下两种状态&#xff1a; 内…

webpack基础知识三:说说webpack中常见的Loader?解决了什么问题?

一、是什么 loader 用于对模块的"源代码"进行转换&#xff0c;在 import 或"加载"模块时预处理文件 webpack做的事情&#xff0c;仅仅是分析出各种模块的依赖关系&#xff0c;然后形成资源列表&#xff0c;最终打包生成到指定的文件中。如下图所示&#…

使用 RediSearch 在 Redis 中进行全文检索

原文链接&#xff1a; 使用 RediSearch 在 Redis 中进行全文检索 Redis 大家肯定都不陌生了&#xff0c;作为一种快速、高性能的键值存储数据库&#xff0c;广泛应用于缓存、队列、会话存储等方面。 然而&#xff0c;Redis 在原生状态下并不支持全文检索功能&#xff0c;这使…

linux初始命令

如果没有ip地址&#xff0c;配置&#xff1a; 查看当前时间&#xff1a; 指定格式查看时间&#xff1a; 修改时间&#xff1a; 查看时区&#xff1a; 设置时区&#xff1a; 查看当前工作目录&#xff1a; root的家目录就是根&#xff0c;普通用户家目录是home

ctfshow-web7

0x00 前言 CTF 加解密合集 CTF Web合集 0x01 题目 0x02 Write Up 通过尝试&#xff0c;发现是数字型的注入&#xff0c;并且同样是过滤了空格 判断字段 获取一下flag即可 1/**/union/**/select/**/1,flag,3/**/from/**/web7.flag#&passworda以上

MySQL日期常见的函数

-- 获取当天日期 -- 2023-06-20 select curdate();-- 获取当天年月日时分秒 select now();-- 日期运算 -- 2024-06-20 17:04:17 select date_add(now(),interval 1 year);-- 日期比较 -- 0 select datediff(now(),now());-- 日期MySQL对于日期类型数据如何查询 -- 获取指定日期…

【果树农药喷洒机器人】Part4:果树冠层图像实例分割模型优化

文章目录 一、引言二、数据集制作2.1图像采集2.2图像标注与增强 三、构建柑橘树冠实例分割模型结构3.1优化特征提取网络3.2U-Net替换FCN 一、引言 为准确获取柑橘树冠的生长信息&#xff0c;实现果树喷药机器人的精准喷施&#xff0c;对处于多种生长阶段的柑橘树冠进行图像分割…

ffmpeg源码编译成功,但是引用生成的静态库(.a)报错,报错位置在xxx_list.c,报错信息为某变量未定义

背景&#xff1a;本文是对上一个文章的补充&#xff0c;在源码编译之前&#xff0c;项目是有完整的ffmpeg编译脚本的&#xff0c;只不过新增了断点调试ffmpeg&#xff0c;所以产生的上面的文章&#xff0c;也就是说&#xff0c;我在用make编译成功后&#xff0c;再去做的源码编…

dfs之卒的遍历

题面 题目描述 在一张nm 的棋盘上&#xff08;如 6 行 7 列&#xff09;的最左上角(1,1) 的位置有一个卒。该卒只能向下或者向右走&#xff0c;且卒采取的策略是先向下&#xff0c;下边走到头就向右&#xff0c;请问从(1,1) 点走到 (n,m) 点可以怎样走&#xff0c;输出这些走法…