FastAPI 学习之路(四十九)WebSockets(五)修复接口测试中的问题

其实代码没有问题,但是我们忽略了一个问题,就是在正常的开发中,肯定是遇到过这样的情况,我们频繁的有客户端链接,断开连接,需要统一的管理这些链接,那么应该如何管理呢。其实可以声明一个类去管理这些链接。接下来我们看下该如何优化。

一、优化测试接口方法

1.定义链接管理类,处理所有链接

"""
websocket 链接管理
"""


from typing import List, Dict

from starlette.websockets import WebSocket


class ConnectionManager:

    def __init__(self):
        """存放链接"""
        self.active_connections: List[Dict[str, WebSocket]] = []

    async def connect(self, user: str, ws: WebSocket):
        """链接"""
        self.active_connections.append({"user": user, "ws": ws})

    async def disconnect(self, user: str, ws: WebSocket):
        """断开链接,移除"""
        self.active_connections.remove({"user": user, "ws": ws})

2.修改应用代码

我们增加了链接,移除链接的操作,那么对应修改下代码

from connection_tool import ConnectionManager
from starlette.websockets import WebSocketDisconnect
ws_manager = ConnectionManager()


@app.websocket("/items/ws")
async def websocket_endpoint(
    websocket: WebSocket,
    cookie_or_token: str = Depends(get_cookie_or_token),
):
    await websocket.accept()
    await ws_manager.connect(cookie_or_token, websocket)
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Message is: {data}")
    except WebSocketDisconnect as e:
        await ws_manager.disconnect(cookie_or_token, websocket)

3.测试 

这样我们在链接处理的时候就可以正常处理了。之前报错是因为我们没有正常的关闭链接导致的,那么我们再测试一下

"""
测试websockets
"""

from fastapi.testclient import TestClient
from main import app


def test_websocket():
    client = TestClient(app)
    with client.websocket_connect("/items/ws?token=fake-token") as websocket:
        websocket.send_text("Hello, this is testing websocket")
        data = websocket.receive_text()
        print(data)
        assert str(data) == f"Message is: Hello, this is testing websocket"


if __name__ == '__main__':
    test_websocket()

此时,发现代码不会再报错 

 二、增加测试用例并优化

1.增加用例代码

import unittest

from fastapi.testclient import TestClient

from main import app


class FastApiTestWeb(unittest.TestCase):

    def setUp(self) -> None:
        self.client = TestClient(app)

    def tearDown(self) -> None:
        self.client = None

    def test_websocket(self):
        with self.client.websocket_connect("/items/ws?token=fake-token") as websocket:
            websocket.send_text("Hello, this is using test case to test websocket")
            data = websocket.receive_text()
            print(data)
            assert str(data) == "Message is: Hello, this is using test case to test websocket"

    def test_websocket_again(self):
        with self.client.websocket_connect("/items/ws?token=fake-token") as websocket:
            websocket.send_text("Hello, this is using test case to test websocket again")
            data = websocket.receive_text()
            print(data)
            assert str(data) == "Message is: Hello, this is using test case to test websocket again"


if __name__ == '__main__':
    unittest.main()

2.执行用例

这样我们的一个测试用例就更加的完整了。我们执行正常是没有报错的

3. 查看代码的覆盖率 

pip install coverage

我们想要看下代码的覆盖率,应该如何看呢。我是用的coverage。

然后再report

  我们想看html测试报告,可以运行下 coverage html。

然后打开index.html

因为我的main.py还有其他的方法,我们还需要点进去看我们对应方法的覆盖率。

如果想要将覆盖率都达到100%,还需要针对其他方法增加测试用例。

到这里,我们对于WebSockets接口测试完毕,但是如果我们想实现上线通知,下线通知,如何实现呢?见下一节。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/800340.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Spring容器Bean之XML配置属性的细节

1、简单值注入使用&#xff1c;property&#xff1e;的value属性可以换一种写法 2、简单值注入使用&#xff1c;property&#xff1e;的value属性值中有些特殊的字符&#xff0c;比如< 、> 的时候可以用包裹 3、对象注入使用&#xff1c;property&#xff1e;的ref属性时…

二 GD32 MCU 烧录说明

GD32 MCU提供了多种烧录方法&#xff0c;可在调试和生产等阶段进行便捷的烧录。GD32目前主要烧录方法有ISP烧录、SWD/JTAG在线下载、脱机烧录三种类型。 ISP烧录&#xff1a;使用串口或USB即可烧录&#xff0c;无需特殊工具支持。可根据协议自行定制下载方式&#xff0c;需要控…

请你谈谈:AnnotatedBeanDefinitionReader 显式地注册一个Bean到Spring容器,以及注册并解析配置类

为了深入探讨Spring框架中的beanDefinition对象&#xff0c;我们不可避免地要提及BeanFactoryPostProcessor这一核心类&#xff0c;它作为Spring的bean工厂后置处理器发挥着关键作用。接下来&#xff0c;我们将详细讨论BeanFactoryPostProcessor的执行时机&#xff0c;这是一个…

人工智能 (AI) 应用:一个高精度ASD 诊断和照护支持系统

自闭症谱系障碍&#xff08;ASD&#xff09;是一种多方面的神经发育状况&#xff0c;影响全球大约1/100的儿童&#xff0c;而在中国&#xff0c;这一比例高达1.8%&#xff08;引用自《中国0&#xff5e;6岁儿童孤独症谱系障碍筛查患病现状》&#xff09;&#xff0c;男童为2.6%…

ns3-gym入门(三):在opengym基础上实现一个小小的demo

因为官方给的"opengym""opengym-2"这两个例子都很简单&#xff0c;所以自己改了一个demo&#xff0c;把reward-action-state相互影响的关系表现出来 一、准备工作 在ns3.35/scratch目录下创建一个文件夹&#xff1a; &#xff08;后续的运行指令后面都需要…

C++字体库开发之字符显示四

freetype提取路径&#xff0c;转svg显示 std::string FontPath::toSvg(const Segment &seg) const {if (seg.pts.empty())return "";std::ostringstream strStream;for (const auto &pt : seg.pts) {if (!strStream.view().empty())strStream << &quo…

【linux】服务器重装系统之系统盘写入准备

【linux】服务器重装系统之系统盘写入准备 【创作不易&#xff0c;求点赞关注收藏】&#x1f600; 文章目录 【linux】服务器重装系统之系统盘写入准备一、前期准备1、准备一个U盘&#xff0c;并进行格式化2、下载UltralSO工具3、下载对应的Ubuntu版本 二、写入操作教程 一、…

gorm多表联合查询 Joins方法 LEFT JOIN , RIGHT JOIN , INNER JOIN, FULL JOIN 使用总结

gorm中多表联合查询&#xff0c;我们可以使用Joins来完成&#xff0c;这个Joins方法很灵活&#xff0c;我们可以非常方便的多多表进行联合查询&#xff0c; 我们先来看看这个方法的官方定义和使用示例&#xff1a; Joins方法定义和使用示例 当然我们这里要说的使用方式是官方示…

nginx生成自签名SSL证书配置HTTPS

一、安装nginx nginx必须有"--with-http_ssl_module"模块 查看nginx安装的模块&#xff1a; rootecs-7398:/usr/local/nginx# cd /usr/local/nginx/ rootecs-7398:/usr/local/nginx# ./sbin/nginx -V nginx version: nginx/1.20.2 built by gcc 9.4.0 (Ubuntu 9.4.0…

Vue.js 中的 immediate: true的作用

在使用 Vue.js 时&#xff0c;监听器 (watchers) 是一种非常重要的工具&#xff0c;它允许我们观察和响应数据的变化。 immediate: true 的作用 默认情况下&#xff0c;监听器只有在所监视的数据属性发生变化时才会触发回调函数。然而&#xff0c;有时候我们需要在组件初始化时…

Hadoop-29 ZooKeeper集群 Watcher机制 工作原理 与 ZK基本命令 测试集群效果 3台公网云服务器

章节内容 上节我们完成了&#xff1a; ZNode的基本介绍ZNode节点类型的介绍事务ID的介绍ZNode实机测试效果 背景介绍 这里是三台公网云服务器&#xff0c;每台 2C4G&#xff0c;搭建一个Hadoop的学习环境&#xff0c;供我学习。 之前已经在 VM 虚拟机上搭建过一次&#xff…

LVS+Nginx高可用集群---keepalived原理与实战

1.高可用集群架构keepalived双机主备原理 高可用&#xff1a;(HA) 部署nginx存在两台nginx。当主节点的nginx宕机停止服务的时候&#xff0c;nginx备用机起到跟nginx(主) keepalived的概念&#xff1a;解决单点故障&#xff1b;组件免费&#xff1b;可以实现高可用HA机制&…

《0基础》学习Python——第十一讲

一、lambda 匿名函数 lambda函数是一种匿名函数。它是一种快速定义单行函数的方法。与常规函数不同&#xff0c;lambda函数没有名称&#xff0c;也没有使用def关键字来定义。lambda函数通常用于一些简单的函数&#xff0c;可以在代码中快速定义和使用&#xff0c;而不需要为其定…

Hive的基本操作(查询)

1、基础查询 基本语法 select 字段列表|表达式|子查询 from 表(子查询|视图|临时表|普通表) where [not] 条件A and|or 条件B --先&#xff1a;面向原始行进行筛选 group by 字段A[,字段B,...] > 分组【去重处理】 having 聚合条件(非原始字段条件) --再&#x…

《梦醒蝶飞:释放Excel函数与公式的力量》12.3 DMIN函数

第12章&#xff1a;数据库函数 第三节 12.3 DMIN函数 12.3.1 简介 DMIN函数是Excel中的一个数据库函数&#xff0c;用于返回数据库或数据表中特定条件下某字段的最小值。DMIN函数在处理大规模数据、数据筛选和分析时非常有用。 12.3.2 语法 DMIN(database, field, criteri…

MYSQL 四、mysql进阶 9(数据库的设计规范)

一、为什么需要数据库设计 二、范 式 2.1 范式简介 在关系型数据库中&#xff0c;关于数据表设计的基本原则、规则就称为范式。 可以理解为&#xff0c;一张数据表的设计结 构需要满足的某种设计标准的 级别 。要想设计一个结构合理的关系型数据库&#xff0c;必须满足一定的…

LLM量化--AWQ论文阅读笔记

写在前面&#xff1a;近来大模型十分火爆&#xff0c;所以最近开启了一波对大模型推理优化论文的阅读&#xff0c;下面是自己的阅读笔记&#xff0c;里面对文章的理解并不全面&#xff0c;只将自己认为比较重要的部分摘了出来&#xff0c;详读的大家可以参看原文 原论文地址&am…

Leetcode—146. LRU 缓存【中等】(shared_ptr、unordered_map、list)

2024每日刷题&#xff08;143&#xff09; Leetcode—146. LRU 缓存 先验知识 list & unordered_map 实现代码 struct Node{int key;int value;Node(int key, int value): key(key), value(value) {} };class LRUCache { public:LRUCache(int capacity): m_capacity(capa…

axios以post方式提交表单形式数据

某些后端框架请求接口必须走form表单提交的那种形式&#xff0c;但前端很少有<form action"接口地址" method"post"></form>这种写法去提交表单数据&#xff0c;所以前端需要用axios模拟一个表单提交接口。 Content-Type 代表发送端&#xff0…

【.NET全栈】ASP.NET开发web应用——ASP.NET中的样式、主题和母版页

文章目录 前言一、在ASP.NET中应用CSS样式1、创建CSS样式&#xff08;1&#xff09;内联样式&#xff08;2&#xff09;内部样式表&#xff08;3&#xff09;外部样式表 2、应用CSS样式&#xff08;1&#xff09;菜鸟教程-简单例子&#xff08;2&#xff09;菜鸟教程-用户界面&…