【AIGC】训练数据入库(Milvus)

之前的文章有写如何获取数据、如何补充数据,也有说如何对数据进行清洗、如何使用结构化数据进行训练。但好像没有说如何将训练数据“入库”。这里说的入库不是指 MySQL 数据库,而是指向量检索库 Milvus。

众所周知,人工智能多用向量数据进行训练。数据先做向量处理并入库能有效减少训练时实时转换带来的性能消耗(之前在 Autokeras 训练时是读取 MySQL 结构化数据的,每次取数后都需要通过 Embedding 先做向量处理再训练)。

至于为什么要选择 Milvus ?这跟公司技术栈有关就不详述了。

部署 Milvus

Milvus 我是采用 Docker Compose 来部署的,主要是因为网上已有现成的 docker compose 脚本“milvus-standalone-docker-compose.yml”直接下载开箱即用即可。如下图:
image.png
整个脚本唯一值得关注的就是 Minio 管理后台账号和密码(若企业部署建议不要写在脚本中,透过添加 --env-file .env 参数的方式进行传递)。接着就通过以下命令启动就好了:

docker-compose -f milvus-standalone-docker-compose.yml up -d

启动结果如下图:
image.png
不得不说的是,Milvus 的 standalone 镜像是非常消耗资源的,为了避免运行过程中出现资源不足的情况,建议在 docker compose 脚本中根据自己机器的实际情况进行资源调整。

接着就可以通过 http://127.0.0.1:9001/login 访问 Minio 的管理后台,如下图:
image.png
至于 Minio 要如何使用,这个不是本节的重点就不再叙述了。

此外,按网上说法还需要部署一个名为 attu 的服务用于管理 Milvus 数据,执行下面命令即可:

docker run -d --name=attu -p 8000:3000 -e MILVUS_URL='宿主机ip':'milvus端口' zilliz/attu:v2.2.6

服务启动后通过 http://127.0.0.1:8000/ 访问 attu 的管理后台,如下图:
image.png
至此 Milvus 部署完毕。(后面若有空会补全 Milvus 相关的知识点内容,先挖个坑…)

代码实现

下面将讲讲如何用 python 将问答数据向量化并入库的。

  1. 创建测试入口并构建基础代码
from transformers import AutoTokenizer, AutoModel
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility

# baai 模型名称与 model 实例
baai_model_name = "BAAI/bge-large-zh-v1.5"
baai_model = AutoModel.from_pretrained(baai_model_name)

...

def data_transformer_to_vetor():
    # 通过 AutoTokenizer 获取 baai 的 tokenizer 对象
    tokenizer = AutoTokenizer.from_pretrained(baai_model_name)
    # 创建 milvus 连接
    connections.connect(milvus_name, host=milvus_url, port=milvus_port)
    # 获取 milvus 中指定的数据集
    collection = find_milvus_collection()
    try:
        while True:
            # 根据条件 flag = 0 分页查询 MySQL 的问答数据
            db_data = mu.query_by_pagination(mysql_page, mysql_page_size, mysql_table_name, 'FLAG = 0')
            if db_data != '':
                db_data_json_arr = json.loads(db_data)
                vectors = []
                ids = []
                # 遍历问答数据集将“问题”和“答案”字段重新组装成一条字符串
                for db_data_json in db_data_json_arr:
                    text = f"问题:{db_data_json[1]} 答案:{db_data_json[2]}"
                    # 使用 baai 模型对字符串进行向量转换
                    vector = change_to_vetor(tokenizer, text)
                    # 将向量数据装载到一个数据集中
                    vectors.append(vector)
                    # 同时,也将 MySQL 中对应记录的 id 进行记录
                    ids.append(db_data_json[0])
                if len(vectors) > 0:
                    # 批量插入向量数据到 milvus 数据集中
                    collection.insert(data=[vectors])
                    # 更新 MySQL 对应的记录,看 flag 设置为 1,这样在下一个循环时就不会被重新筛选出来了
                    mu.update_by_ids(mysql_table_name, 'FLAG = 1', ids)
            else:
                break
    except Exception as e:
        print(f"Error: {e}")
    finally:
        # 无论成功与否都释放数据集并关闭 milvus 连接
        collection.release()
        connections.disconnect(milvus_name)
...

if __name__ == '__main__':
    data_transformer_to_vetor()

在上面的代码中提到了 find_milvus_collection 和 change_to_vetor 函数,下面让我先对 change_to_vetor 函数进行解释。

  1. change_to_vetor 函数(baai 模型转换向量数据)
def change_to_vetor(tokenizer, text):
    # 定义输入内容
    inputs = tokenizer(text, return_tensors="pt",max_length=512, padding=True, truncation=True)
    # 调用 baai_model 并以 inputs 作为入参
    outputs = baai_model(**inputs)
    # outputs 转换成向量数据返回
    return outputs.last_hidden_state.mean(dim=1).squeeze().detach().numpy()

这个 3 行代码将非常有用,它不仅存储的时候需要使用,在后面做数据取回时也需要先将字符串向量化后,再提供给 Milvus 进行数据筛选的。因此有必要独立成一个函数,方便后面使用。

  1. find_milvus_collection 函数(获取 milvus 数据集)
def find_milvus_collection():
    # 使用 utility.has_collection 来判断在 milvus 中是否存在指定的数据集
    if utility.has_collection(milvus_collection_name, using=milvus_name):
        # 如果有直接获取
        collection = Collection(milvus_collection_name, using=milvus_name)
    else:
        # 没有则需要调用 create_milvus_collection 函数创建数据集
        collection = create_milvus_collection()
    # 加载数据到内存
    collection.load()
    return collection

这个函数首先使用了 pymilvus 的 utility.has_collection 进行判断,这里需要注意的是如果 Milvus 连接是自定义的情况下,必须加上 using 参数指向自定义连接,不然系统会使用默认“default”连接并报以下错误:

pymilvus.exceptions.ConnectionNotExistException: <ConnectionNotExistException: (code=1, message=should create connection first.)>

同理,在创建数据集时也需要用 using 参数指向自定义连接。

在获取到数据集之后需要将其加载到内存里面,不然在点击 attu 的 data preview 选项卡时会看到以下错误:

Failed to search: collection not loaded[collection=448914542771864105]

需要注意的是,加载大型集合可能需要一些时间,具体取决于集合的大小和系统资源。如果不再需要对集合进行操作,建议使用 collection.release() 方法将其从内存中释放,以节省资源。

  1. create_milvus_collection 函数(创建 Milvus 数据集)
def create_milvus_collection():
    # 创建 id 字段作为主键
    id_field = FieldSchema(name="id", dtype=DataType.INT64,is_primary=True, auto_id=True)
    # 创建向量数据存储字段
    vector_field = FieldSchema(name="vector_qa", dtype=DataType.FLOAT_VECTOR, dim=milvus_dim)
    # 定义schema
    schema = CollectionSchema(fields=[id_field, vector_field], description="TCM Question and Answer Dataset", enable_dynamic_field=True)
    # 创建数据集
    collection = Collection(name=milvus_collection_name,schema=schema, using=milvus_name)
    # 给向量数据字段添加索引
    collection.create_index(field_name="vector_qa", index_params={"metric_type": "L2", "index_type": "IVF_PQ", "params": {"nlist": milvus_dim}})
    return collection

这个创建数据集的函数其实也比较好理解,就像 MySQL 的表创建一样。

需要注意的是,在网上的例子中向量字段(本例子为:vector_field)会被创建成一个维度为 768 的 FLOAT_VECTOR 类型字段,但问答数据在经过 baai 模型转换后的默认维度为 1024,在数据保存的时候就会出现以下错误:

RPC error: [batch_insert], <ParamError: (code=1, message=Collection field dim is 768, but entities field dim is 1024)>, <Time:{'RPC start': '2024-04-07 16:02:07.393036', 'RPC error': '2024-04-07 16:02:07.393106'}>
Error: <ParamError: (code=1, message=Collection field dim is 768, but entities field dim is 1024)>

这时只需要将 dim 参数从 768 改为 1024 即可(可以简单理解为 MySQL 字段内容过长,那就加大字段长度呗)。如果维度一定要保持在 768,那么可以使用 PCA 进行降维处理。

既然需要重建数据集,那么如何将已有的数据集删除呢?

除了通过代码的方式删除外,我们还可以通过 attu 界面删除,如下图:

image.png

先确认是否当前的数据集,之后在上一页的可视化界面中删除即可。

image.png
至此,所有代码都已经编写完成了。噢,还有一件事儿。我这边在启动代码时会报这个警告:

/Users/yuanzhenhui/anaconda3/envs/transformer/lib/python3.11/site-packages/torch/_utils.py:831: UserWarning: TypedStorage is deprecated. It will be removed in the future and UntypedStorage will be the only storage class. This should only matter to you if you are using storages directly.  To access UntypedStorage directly, use tensor.untyped_storage() instead of tensor.storage()
  return self.fget.__get__(instance, owner)()

这个警告信息是由 PyTorch 库引发的,它提示 TypedStorage 类已经被弃用,在未来的版本中将被移除,届时只会保留 UntypedStorage 类。这个警告主要与直接使用 storage 有关。TypedStorage 和 UntypedStorage 是 PyTorch 中用于存储张量(tensor)数据的底层存储类。TypedStorage 是针对特定数据类型(如 Float、Double、Int 等)的存储,而 UntypedStorage 是一种通用的存储类型。

这个警告并不影响代码的使用,暂时忽略即可,但如果有条件的可以按照提示进行切换。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/531668.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Kubernetes(k8s)监控与报警(qq邮箱+钉钉):Prometheus + Grafana + Alertmanager(超详细)

Kubernetes&#xff08;k8s&#xff09;监控与报警&#xff08;qq邮箱钉钉&#xff09;&#xff1a;Prometheus Grafana Alertmanager&#xff08;超详细&#xff09; 1、部署环境2、基本概念简介2.1、Prometheus简介2.2、Grafana简介2.3、Alertmanager简介2.4、Prometheus …

【leetcode】动态规划::前缀和(二)

标题&#xff1a;【leetcode】前缀和&#xff08;二&#xff09; 水墨不写bug 正文开始&#xff1a; &#xff08;一&#xff09; 和为K的子数组 给你一个整数数组 nums 和一个整数 k &#xff0c;请你统计并返回 该数组中和为 k 的子数组的个数 。 子数组是数组中元素的连续…

C#开发 之 解决win11缩放导致的字体模糊问题

现在我们的笔记本电脑分辨率很高&#xff0c;基本上能达到1920*1080以上&#xff0c;目前普遍使用的显示器都已经达到了2K到4K的级别。 但是因为我们的笔记本的屏幕小&#xff0c;在非常高的分辨率下&#xff0c;一切看着都很小&#xff0c;尤其是文字&#xff0c;根本看不清&…

【Linux基础IO】

【Linux基础IO】 C文件接口回顾test.c 写文件test.c读文件> 和 >> 理解文件stdin & stdout & stderr标准输入&#xff08;stdin&#xff09;标准输出&#xff08;stdout&#xff09;标准错误输出&#xff08;stderr&#xff09; 系统文件I/O接口介绍openpathn…

什么是redis? 如何在SpringBoot中集成和操作redis?

喜欢就点击上方关注我们吧&#xff01; 本篇将带你快速了解什么是redis&#xff0c;以及学会如何在SpringBoot工程下集成和操作redis数据库。 一、概述 1、定义 Redis是一个基于内存的key-value 结构数据库。 1&#xff09;特点&#xff1a; 1、基于内存存储&#xff0c;读写性…

docker pull镜像的时候指定arm平台

指定arm平台 x86平台下载arm平台的镜像包 以mysql镜像为例 docker pull --platform linux/arm64 mysqldocker images查看镜像信息 要查看Docker镜像的信息&#xff0c;可以使用docker inspect命令。这个命令会返回镜像的详细信息&#xff0c;包括其元数据和配置。 docker i…

Canvas拖动图片效果

效果预览 代码 <!DOCTYPE html> <html><head><meta charset"utf-8" /><title>mouse event</title></head><body><div><canvasid"cs"width"800"height"400"style"bord…

doss攻击为什么是无解的?

这个让Google、亚马逊等实力巨头公司也无法避免的攻击。可以这么说&#xff0c;是目前最强大、最难防御的攻击之一&#xff0c;属于世界级难题&#xff0c;并且没有解决办法。 Doss攻击的原理不复杂&#xff0c;就是利用大量肉鸡仿照真实用户行为&#xff0c;使目标服务器资源…

CSS导读 (复合选择器 下)

&#xff08;大家好&#xff0c;今天我们将继续来学习CSS的相关知识&#xff0c;大家可以在评论区进行互动答疑哦~加油&#xff01;&#x1f495;&#xff09; 目录 2.5 伪类选择器 2.6 链接伪类选择器 2.6.1 链接伪类注意事项 2.6.2 链接伪类选择器实际开发中的写法 2.7 …

每日一题 第八十九期 洛谷 [NOIP2017 提高组] 奶酪

[NOIP2017 提高组] 奶酪 题目背景 NOIP2017 提高组 D2T1 题目描述 现有一块大奶酪&#xff0c;它的高度为 h h h&#xff0c;它的长度和宽度我们可以认为是无限大的&#xff0c;奶酪中间有许多半径相同的球形空洞。我们可以在这块奶酪中建立空间坐标系&#xff0c;在坐标系…

Redis-缓存击穿-逻辑过期

Redis-缓存击穿-逻辑过期实现 缓存击穿&#xff1a;也称热点key问题&#xff0c;大量访问一个key&#xff0c;而这个key恰巧到期了&#xff0c;导致大量的请求访问数据库。增大数据库的负担。为了解决这个问题可以采用互斥锁或逻辑过期的方式解决。本章采用逻辑过期的方式解决…

Golang笔记(下)

Golang学习笔记&#xff08;下&#xff09; 前篇&#xff1a;Golang学习笔记(上) 十四、错误处理 14.1使用error类型 func New(text string) error例子&#xff1a; package mainimport ("errors" // 导入errors包"fmt" )func main() {var number, divi…

【数据结构】树与二叉树遍历算法的应用(求叶子节点个数、求树高、复制二叉树、创建二叉树、二叉树存放表达式、交换二叉树每个结点的左右孩子)

目录 求叶子节点个数、求树高、复制二叉树、创建二叉树、二叉树存放表达式、交换二叉树每个结点的左右孩子应用一&#xff1a;统计二叉树中叶子结点个数的算法写法一&#xff1a;使用静态变量写法二&#xff1a;传入 count 作为参数写法三&#xff1a;不使用额外变量 应用二&am…

【Linux】socket编程2

欢迎来到Cefler的博客&#x1f601; &#x1f54c;博客主页&#xff1a;折纸花满衣 &#x1f3e0;个人专栏&#xff1a;题目解析 目录 &#x1f449;&#x1f3fb;客户端代码Makefile(生成目标文件)UdpClient.cc(客户端代码)服务端代码部分优化1&#xff08;接受客户端时显示客…

基于51单片机低中高音7键电子琴音乐播放器

基于51单片机电子琴音乐播放器 &#xff08;仿真&#xff0b;程序&#xff0b;原理图&#xff0b;PCB&#xff0b;设计报告&#xff09; 功能介绍 具体功能&#xff1a; 1.可以使用按键切换音乐播放模式和弹奏模式&#xff1b; 2.LED灯显示在使用哪种模式&#xff1b; 3.音乐…

Redis部署之主从

使用两台云服务器&#xff0c;在 Docker 下部署。 Redis版本为&#xff1a;7.2.4 下载并配置redis 配置文件 下载 wget -c http://download.redis.io/redis-stable/redis.conf配置 master节点配置 bind 0.0.0.0 # 使得Redis服务器可以跨网络访问,生产环境请考虑…

第十四届蓝桥杯C/C++大学B组题解(二)

6、岛屿个数 #include <bits/stdc.h> using namespace std; const int M51; int T,m,n; int vis[M][M],used[M][M]; int dx[]{1,-1,0,0,1,1,-1,-1}; int dy[]{0,0,1,-1,1,-1,1,-1}; string mp[M]; struct node{//记录一点坐标 int x,y; }; void bfs_col(int x,int y){ qu…

C++ | Leetcode C++题解之第14题最长公共前缀

题目&#xff1a; 题解&#xff1a; class Solution { public:string longestCommonPrefix(vector<string>& strs) {if (!strs.size()) {return "";}int minLength min_element(strs.begin(), strs.end(), [](const string& s, const string& t)…

同步压缩理论

参考 在频率方向进行能量重新分配&#xff08;分配到中心&#xff09; 时频重排

实验4 DHCP基础配置

实验4 DHCP基础配置 一、 原理描述二、 实验目的三、 实验内容四、 实验配置五、 实验步骤1.基本配置2.配置DHCPServer功能3.配置DHCP Client 一、 原理描述 动态主机配置协议 DHCP是一个局域网的网络协议&#xff0c;使用UDP协议工作&#xff0c;主要有两个用途&#xff1a;用…