性能测试 - Locust WebSocket client

Max.Bai

2024.10

0. 背景

Locust 是性能测试工具,但是默认只支持http协议,就是默认只有http的client,需要其他协议的测试必须自己扩展对于的client,比如下面的WebSocket client。

1. WebSocket test Client
“”“
Max.Bai
Websocket Client
”“”
import json
import logging
import secrets
import threading
import time
from typing import Callable, Optional

import websocket
from locust import events

logger = logging.getLogger(__name__)


class WebSocketClient:
    def __init__(self, host: str, log_messages: bool = False):
        self._host: str = host
        self._id: str = secrets.token_hex(8)
        self._alias: Optional[str] = None
        self._ws: Optional[websocket.WebSocketApp] = None

        self.log_messages = log_messages
        self.count_recv_type = False
        self.heartbeat_auto_respond = False
        self._recv_messages: list = []
        self.messages: list = []
        self._sent_messages: list = []

    def __enter__(self):
        self.connect()
        return self

    def __exit__(self, type, value, traceback):
        self.disconnect()

    @property
    def tag(self) -> str:
        tag = f"{self._host} <{self._id}>"
        if self._alias:
            tag += f"({self._alias})"
        return tag

    def connect(
        self, alias: Optional[str] = None, headers: Optional[dict] = None, on_message: Optional[Callable] = None
    ):
        if not self._ws:
            self._alias = alias
            self._ws = websocket.WebSocketApp(
                url=self._host,
                header=headers,
                on_open=self._on_open,
                on_message=on_message if on_message else self._on_message,
                on_close=self._on_close,
            )
            thread = threading.Thread(target=self._ws.run_forever)
            thread.daemon = True
            thread.start()
            time.sleep(3)
        else:
            logger.warning("An active WebSocket connection is already established.")

    def is_connected(self) -> bool:
        return self._ws is not None

    def disconnect(self):
        if self._ws:
            self._ws.close()
            self._alias = None
        else:
            logger.warning("No active WebSocket connection established.")

    def _on_open(self, ws):
        logger.debug(f"[WebSocket] {self.tag} connected.")
        events.request.fire(
            request_type="ws_client",
            name="connect",
            response_time=0,
            response_length=0,
        )

    def _on_message(self, ws, message):
        recv_time = time.time()
        recv_time_ms = int(recv_time * 1000)
        recv_time_ns = int(recv_time * 1000000)
        logger.debug(f"[WebSocket] {self.tag} message received: {message}")
        if self.log_messages:
            self._recv_messages.append(message)
        self.messages.append(message)

        # public/respond-heartbeat
        if self.heartbeat_auto_respond:
            if "public/heartbeat" in message:
                self.send(message.replace("public/heartbeat", "public/respond-heartbeat"))

        if self.count_recv_type:
            try:
                msg = json.loads(message)
                id = str(msg.get("id", 0))
                if len(id) == 13:
                    resp_time = recv_time_ms - int(id)
                elif len(id) == 16:
                    resp_time = (recv_time_ns - int(id)) / 1000
                elif len(id) > 13:
                    resp_time = recv_time_ms - int(id[:13])
                else:
                    resp_time = 0
                method = msg.get("method", "unknown")
                code = msg.get("code", "unknown")
                error = msg.get("message", "unknown")
                # send_time = int(msg.get("nonce", 0))
                if method in ["public/heartbeat", "private/set-cancel-on-disconnect"]:
                    events.request.fire(
                        request_type="ws_client",
                        name=f"recv {method}",
                        response_time=0,
                        response_length=len(msg),
                    )
                elif code == 0:
                    events.request.fire(
                        request_type="ws_client",
                        name=f"recv {method} {code}",
                        # response_time=recv_time - send_time,
                        response_time=resp_time,
                        response_length=len(msg),
                    )
                else:
                    events.request.fire(
                        request_type="ws_client",
                        name=f"recv {method} {code}",
                        response_time=resp_time,
                        response_length=len(msg),
                        exception=error,
                    )
            except Exception as e:
                events.request.fire(
                    request_type="ws_client",
                    name="recv error",
                    response_time=0,
                    response_length=len(msg),
                    exception=str(e),
                )

    def _on_close(self, ws, close_status_code, close_msg):
        logger.debug(f"[WebSocket] {self.tag} closed.")
        self._ws = None

        events.request.fire(
            request_type="ws_client",
            name="close",
            response_time=0,
            response_length=0,
        )

    def set_on_message(self, on_message: Callable):
        self._ws.on_message = on_message

    def send(self, message: str):
        if self._ws:
            self._ws.send(data=message)
            if self.log_messages:
                self._sent_messages.append(message)
            logger.debug(f"[WebSocket] {self.tag} message sent: {message}")
        else:
            logger.warning(f"No active [WebSocket] {self.tag} connection established.")
            raise ConnectionError("No active [WebSocket] connection established.")

    def clear(self):
        self._recv_messages = []
        self._sent_messages = []
        self.messages = []

    def expect_messages(
        self,
        matcher: Callable[..., bool],
        count: int = 1,
        timeout: int = 10,
        interval: int = 1,
    ) -> list:
        """Expect to receive one or more filtered messages.

        Args:
            matcher (Callable): A matcher function used to filter the received messages.
            count (int, optional): Number of messages to be expected before timeout. Defaults to 1.
            timeout (int, optional): Timeout in seconds. Defaults to 10.
            interval (int, optional): Interval in seconds. Defaults to 1.

        Returns:
            list: A list of messages filtered by the matcher.
        """

        deadline: float = time.time() + timeout
        result: list = []  # messages filtered by the matcher
        seen: list = []  # messages already seen by the matcher to be excluded from further matching

        while time.time() < deadline:
            snapshot: list = [*self._recv_messages]

            for element in seen:
                if element in snapshot:
                    snapshot.remove(element)

            result.extend(filter(matcher, snapshot))
            if len(result) >= count:
                break

            seen.extend(snapshot)
            time.sleep(interval)

        if len(result) < count:
            logger.warning(
                f"({self.tag}) Expected to receive {count} messages, but received only {len(result)} messages."
            )

        return result
2. 如何使用
class PrivateWsUser(User):

    def on_start(self):
        self.ws_client=WebSocketClient("wss://abc.pp.com/chat", log_message=True)
        self.ws_client.connect()


    @task
    def send_hello()
        self.ws_client.send("hello world")
    
    
            
3. 扩展

可自行扩展on_message 方法,上面的on_message 方法是json 格式的信息处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/954564.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

迅为RK3568开发板篇OpenHarmony配置HDF驱动控制LED-新增 topeet子系统-编写 bundle.json文件

bundle.json 文件内容如下所示&#xff1a; 下面是对各个字段的解释&#xff1a; 1. name: "ohos/demos" - 这是组件或项目的名称&#xff0c;这里表示它属于 OHOS&#xff08;OpenHarmony OS&#xff09;生态系统下的一个名为"demos"的组件。 2. descri…

STM32 物联网智能家居 (三) 输入子系统

STM32 物联网智能家居 (三) 输入子系统 下面是物联网智能家居的输入子系统&#xff0c;见下图&#xff0c;在输入子系统中会实现按键输入、网络输入、标准输入Scanf&#xff0c;其中的网络输入放入到网络子系统中进行讲解。 一、输入子系统核心功能 STM32 物联网智能家居输入…

Windows 正确配置android adb调试的方法

下载适用于 Windows 的 SDK Platform-Tools https://developer.android.google.cn/tools/releases/platform-tools?hlzh-cn 设置系统变量&#xff0c;路径为platform-tools文件夹的绝对路径 点击Path添加环境变量 %adb%打开终端输入adb shell 这就成功了&#xff01;

【C#深度学习之路】如何使用C#实现Yolo8/11 Segment 全尺寸模型的训练和推理

【C#深度学习之路】如何使用C#实现Yolo8/11 Segment 全尺寸模型的训练和推理 项目背景项目实现推理过程训练过程 项目展望写在最后项目下载链接 本文为原创文章&#xff0c;若需要转载&#xff0c;请注明出处。 原文地址&#xff1a;https://blog.csdn.net/qq_30270773/article…

线性回归超详解

目录 一、回归问题 vs 分类问题 二、线性回归 1、一句话理解 2、数学推导 2.1 线性函数表示 2.2 损失函数 2.3 梯度下降 2.3.1 什么是梯度 2.3.2 梯度下降目标 2.3.3 过程 2.3.4 迭代公式 3、特征预处理 3.1 为什么要预处理 3.2 数据归一化方法 1&#xff09;最小…

docker 部署 Kafka 单机和集群

一、准备工作 安装 Docker 确保本机已安装 Docker。可以通过以下命令检查 Docker 是否已安装&#xff1a;docker --version如果未安装&#xff0c;可以访问 Docker 官网下载并安装 Docker Desktop&#xff08;Windows 和 Mac&#xff09;或使用包管理器安装&#xff08;Linux&…

Uniapp开发安卓App,配置第一次打开软件出现的弹窗-隐私政策提示框

这里是直接使用的uniapp官方所提供的“原生隐私政策提示框”&#xff0c;废话不多说&#xff0c;直接上教程&#xff01; 1.manifest.json—>安卓/IOS启动界面配置—>勾选“使用原生隐私政策提示框”2.勾选后&#xff0c;在你的项目下就会出现一个文件&#xff0c;andro…

微信小程序:播放音频

在小程序开发中&#xff0c;音频播放是一个重要的功能。本文将详细介绍小程序音频播放的相关知识点&#xff0c;帮助开发者更好地掌握小程序音频播放的实现方法。 一、小程序音频播放的基本流程 在小程序中&#xff0c;音频播放的基本流程如下&#xff1a; 获取音频数据&#…

Unity解决滑动条的value值的滑动条消失问题

在这里我们看到原本的value的滑动条消失了 解决办法 把编辑器的边框往外面拉一下就可以了&#xff08;之前遇到这个问题还重启了几次unity没想到居然是这个问题&#xff09;

Mac上安装Label Studio

在Mac上安装Anaconda并随后安装Label Studio&#xff0c;可以按照以下步骤进行&#xff1a; 1. 在Mac上安装Anaconda 首先&#xff0c;你需要从Anaconda的官方网站下载适用于Mac的安装程序。访问Anaconda官网&#xff0c;点击“Download Anaconda”按钮&#xff0c;选择适合M…

微软震撼发布:Phi-4语言模型登陆Hugging Face

近日&#xff0c;微软公司在Hugging Face平台上正式发布了其最新的语言模型Phi-4&#xff0c;这一发布标志着人工智能技术的又一重要进步。Phi-4模型以其140亿参数的高效配置&#xff0c;在复杂推理任务中表现出色&#xff0c;特别是在数学领域&#xff0c;更是展现出了卓越的能…

使用WebdriverIO和Appium测试App

1.新建项目 打开Webstorm新建项目 打开终端输入命令 npm init -y npm install wdio/cli allure-commandline --save-dev npx wdio config 然后在终端依次选择如下&#xff1a; 然后在终端输入命令&#xff1a; npm install wdio/local-runnerlatest wdio/mocha-frameworkla…

【opencv】第7章 图像变换

7.1 基 于OpenCV 的 边 缘 检 测 本节中&#xff0c;我们将一起学习OpenCV 中边缘检测的各种算子和滤波器——Canny 算子、Sobel 算 子 、Laplacian 算子以及Scharr 滤波器。 7.1.1 边缘检测的一般步骤 在具体介绍之前&#xff0c;先来一起看看边缘检测的一般步骤。 1.【第…

浙江安吉成新照明电器:Acrel-1000DP 分布式光伏监控系统应用探索

安科瑞吕梦怡 18706162527 摘 要&#xff1a;分布式光伏发电站是指将光伏发电组件安装在用户的建筑物屋顶、空地或其他适合的场地上&#xff0c;利用太阳能进行发电的一种可再生能源利用方式&#xff0c;与传统的大型集中式光伏电站相比&#xff0c;分布式光伏发电具有更灵活…

Linux检查磁盘占用情况

1.检查使用情况 df -h发现是/dev/vda1占用很高 2.查看/dev/vda1文件夹 cd /dev/vda1发现不是文件夹 3.继续查看使用情况 df -h *4.原因可能是文件已经删除但是进程还在&#xff0c;没有释放空间 5.查看删除操作的进程 lsof -n | grep deleted6.杀死进程 kill -9 PID

向量数据库Milvus详解

向量数据库Milvus详解 0. 什么是向量数据库? 在现实世界中,并非所有数据都可以整齐地放到行和列中。在处理图像、视频和自然语言等复杂的非结构化数据时尤其如此。这就是向量数据库的用武之地。 向量数据库是一种以高维向量的形式来存储数据的数据库,这些向量本质上是表示…

海豚调度DolphinScheduler-3.1.9配置windows本地开发环境

源代码下载地址https://dolphinscheduler.apache.org/zh-cn/docs/3.1.9 1.Zookeeper安装与使用 如图下载解压zookeeper安装包&#xff0c;并创建data和log目录 下载地址 https://archive.apache.org/dist/zookeeper/zookeeper-3.6.4/apache-zookeeper-3.6.4-bin.tar.gz 进入…

springCloudGateway+nacos自定义负载均衡-通过IP隔离开发环境

先说一下想法&#xff0c;小公司开发项目&#xff0c;参考若依框架使用的spring-cloud-starter-gateway和spring-cloud-starter-alibaba-nacos, 用到了nacos的配置中心和注册中心&#xff0c;有多个模块&#xff08;每个模块都是一个服务&#xff09;。 想本地开发&#xff0c;…

大模型训练_硬件微调知识增强

目录 关键硬件 大模型类型 垂域训练技术 微调技术 领域大模型训练trick 知识增强 关键硬件 GPU GPU擅长处理图形渲染和数据并行任务&#xff0c;可以同时处理大量的矩阵运算&#xff0c;在科学计算、人工智能、游戏开发等领域应用广泛。 显卡 显卡是一种完整的硬件设…

linux分配磁盘空间命令

使用命令lsblk查询linux磁盘空间时&#xff0c;发现空间并没有被分配完 如图&#xff0c;600G&#xff0c;但实际分配了一共199G&#xff0c;剩余500G&#xff0c;我们需要通过命令进行剩余存储的分配。 思路&#xff1a;创建新的分区->更新内核分区表->初始化新分区作…