milvus upsert流程源码分析

milvus版本:v2.3.2

整体架构:

在这里插入图片描述

Upsert 的数据流向:

在这里插入图片描述

1.客户端sdk发出Upsert API请求。

import numpy as np
from pymilvus import (
    connections,
    Collection,
)

num_entities, dim = 4, 3

print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")

hello_milvus = Collection("hello_milvus")

print("Start upsert entities")
rng = np.random.default_rng(seed=19530)
entities = [
    [0,1,2,4000],
    [10,11,12,4000],
    rng.random((num_entities, dim)),
]
hello_milvus.upsert(entities)

2.服务端接受API请求,将request封装为upsertTask,并压入dmQueue队列。

注意这里是dmQueue。DDL类型的是ddQueue。

代码路径:internal\proxy\impl.go

// Upsert upsert records into collection.
func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) (*milvuspb.MutationResult, error) {
	......
    // request封装为upsertTask
	it := &upsertTask{
		baseMsg: msgstream.BaseMsg{
			HashValues: request.HashKeys,
		},
		ctx:       ctx,
		Condition: NewTaskCondition(ctx),
		req:       request,
		result: &milvuspb.MutationResult{
			Status: merr.Success(),
			IDs: &schemapb.IDs{
				IdField: nil,
			},
		},

		idAllocator:   node.rowIDAllocator,
		segIDAssigner: node.segAssigner,
		chMgr:         node.chMgr,
		chTicker:      node.chTicker,
	}

	......
    // 将task压入dmQueue队列
	if err := node.sched.dmQueue.Enqueue(it); err != nil {
		......
	}

	......
    // 等待任务执行完
	if err := it.WaitToFinish(); err != nil {
		......
	}

	......
}

3.执行upsertTask的3个方法PreExecute、Execute、PostExecute。

PreExecute()一般为参数校验等工作。

Execute()为真正执行逻辑。

PostExecute()执行完后的逻辑,什么都不做,返回nil。

代码路径:internal\proxy\task_upsert.go

func (it *upsertTask) Execute(ctx context.Context) (err error) {
	ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Upsert-Execute")
	defer sp.End()
	log := log.Ctx(ctx).With(zap.String("collectionName", it.req.CollectionName))

	tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute upsert %d", it.ID()))
	// 拿到stream,类型为msgstream.mqMsgStream
    stream, err := it.chMgr.getOrCreateDmlStream(it.collectionID)
	if err != nil {
		return err
	}
    // 创建msgPack
	msgPack := &msgstream.MsgPack{
		BeginTs: it.BeginTs(),
		EndTs:   it.EndTs(),
	}
    // 添加insertMsgPack
	err = it.insertExecute(ctx, msgPack)
	if err != nil {
		log.Warn("Fail to insertExecute", zap.Error(err))
		return err
	}
    // 添加deleteMsgPack
	err = it.deleteExecute(ctx, msgPack)
	if err != nil {
		log.Warn("Fail to deleteExecute", zap.Error(err))
		return err
	}

	tr.RecordSpan()
    // 发送数据至mq
	err = stream.Produce(msgPack)
	if err != nil {
		it.result.Status = merr.Status(err)
		return err
	}
	sendMsgDur := tr.RecordSpan()
	metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(sendMsgDur.Milliseconds()))
	totalDur := tr.ElapseSpan()
	log.Debug("Proxy Upsert Execute done", zap.Int64("taskID", it.ID()),
		zap.Duration("total duration", totalDur))
	return nil
}

msgPack变量:

在这里插入图片描述

msgPack包含了insertRequest和deleteRequest。

在这里插入图片描述

insertRequest包含了客户端的upsert数据,以及还会有rowid,用来唯一标识一列数据。

在这里插入图片描述

deleteRequest包含主键值。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/416362.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

代码随想录刷题训练营day25:LeetCode(216)组合总和III、LeetCode(17)电话号码的字母组合

代码随想录刷题训练营day25:LeetCode(40)组合总和 II、LeetCode(216)组合总和III、LeetCode(17)电话号码的字母组合 LeetCode(40)组合总和 II 题目 代码 import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util…

lv19 多态 4

1 虚函数 虚函数&#xff08; 基类指针可指向派生类对象&#xff0c; 动态联编&#xff09; 先看示例&#xff0c;不加virtual&#xff0c;不认对象认指针。 #include <iostream>using namespace std;class A{ public:A(){ }~A(){ }void show(){cout<<"AAA…

火灾安全护航:火灾监测报警摄像机助力建筑安全

火灾是建筑安全中最常见也最具破坏力的灾难之一&#xff0c;为了及时发现火灾、减少火灾造成的损失&#xff0c;火灾监测报警摄像机应运而生&#xff0c;成为建筑防火安全的重要技术装备。 火灾监测报警摄像机采用高清晰度摄像头和智能识别系统&#xff0c;能够全天候监测建筑内…

LeetCode #104 二叉树的最大深度

104. 二叉树的最大深度 题目 二叉树的 最大深度 是指从根节点到最远叶子节点的最长路径上的节点数。 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;3 示例 2&#xff1a; 输入&#xff1a;root [1,null,2] 输出&#xff1a;2 分析 …

程序员缺乏经验的 7 种表现!

程序员缺乏经验的 7 种表现&#xff01; 一次性提交大量代码 代码写的很烂 同时开展多项工作 性格傲慢 不能从之前的错误中学到经验 工作时间处理私人事务 盲目追逐技术潮流 知道这些表现&#xff0c;你才能在自己的程序员职业生涯中不犯相同的错误。 软件行业的工作经…

Zookeeper基础入门-2【ZooKeeper 分布式锁案例】

Zookeeper基础入门-2【ZooKeeper 分布式锁案例】 四、ZooKeeper-IDEA环境搭建4.1.环境搭建4.1.1.创建maven工程&#xff1a;zookeeper4.1.2.在pom文件添加依赖4.1.3.在项目的src/main/resources 目录下&#xff0c;新建文件为“log4j.properties”4.1.4.创建包名com.orange.zk …

springboot 注解属性转换字典

1.注解相关功能实现 定义属性注解 import com.fasterxml.jackson.annotation.JacksonAnnotationsInside; import com.fasterxml.jackson.databind.annotation.JsonSerialize; import com.vehicle.manager.core.serializer.DicSerializer;import java.lang.annotation.*;/*** a…

c++学习记录 deque容器—数据存取

函数原型&#xff1a; at(int idx); //返回索引idx所指的数据operator[]; //返回索引idx所指的数据front(); //返回容器中第一个数据元素back(); //返回容器中最后一个数据元素 #include<iostream> using nam…

websocket在django中的运用

14-2 聊天室实现思路&#xff1a;轮训、长轮训、websocket_哔哩哔哩_bilibili 参考大佬的B站学习笔记 https://www.cnblogs.com/wupeiqi/p/6558766.html 参考博客 https://www.cnblogs.com/wupeiqi/articles/9593858.html 参考博客 http协议: 是短连接&#xff0c;无状态…

【LeetCode】每日一题:使二叉树所有路径值相等的最小代价

该题采用自底向上的思路的话&#xff0c;很容易想到使用贪心的思想&#xff0c;但是如何进行具体操作却有些难度。 这里补充一个重要的结论&#xff1a;二叉树的数组形式中&#xff0c;第i个节点的父节点是i/2&#xff1b;接下来只需要让自底向上让每个路径上的代价保持最低限…

Python实现自动检测设备连通性并发送告警到企业微信

背景&#xff1a;门禁机器使用的WiFi连接&#xff0c;因为某些原因会不定期自动断开连接&#xff0c;需要人工及时干预&#xff0c;以免影响门禁数据同步&#xff0c;故写此脚本&#xff0c;定时检测门禁网络联通性。 #首次使用要安装tcping模块 pip install tcpingfrom tcpin…

Jupyter Notebook 下载+简单设置

这里写目录标题 1. Jupyter Notebook安装2.切换打开别的盘3. 创建代码文件4.为jupyter notebook添加目录 (Jupyter安装拓展nbextensions)step1&#xff1a;安装命令step2&#xff1a;用户配置step3&#xff1a;上述过程均完成后&#xff0c;打开jupyter notebook就会发现界面多…

静态方式部署集中式网关

在静态方式部署集中式网关的场景中&#xff0c;控制平面的流程包括VXLAN隧道建立、MAC地址动态学习&#xff1b;转发平面的流程包括同子网已知单播报文转发、同子网BUM&#xff08;Broadcast&Unknown-unicast&Multicast&#xff09;报文转发、跨子网报文转发。 静态方…

qml 项目依赖

文章目录 出现的问题最终对比下一步 把 apptestQml3_6.exe 放到一个单独目录下&#xff0c;执行 windeployqt.exe ./apptestQml3_6.exe但是出了很多问题&#xff0c;根本运行不起来。 但是在release目录下执行下&#xff0c;程序能跑起来。 根据错误提示&#xff0c;进行添加。…

vue3+vite+ts配置多个代理并解决报404问题

之前配置接口代理总是报404,明明接口地址是对的但还是报是因数写法不对;用了vue2中的写法 pathRewrite改为rewrite 根路径下创建env文件根据自己需要名命 .env.development文件内容 # just a flag ENVdevelopment# static前缀 VITE_APP_PUBLIC_PREFIX"" # 基础模块…

php 支持mssqlserver

系统不支持:sqlsrv 需要一下几个环节 1.准备检测php版本 查看 VC 版本 查看操作系统位数&#xff1a;X86(32位) 和X64 2.下载php的sqlserver库 extensionphp_sqlsrv_74_nts_x64.dll extensionphp_pdo_sqlsrv_74_nts_x64.dll extensionphp_sqlsrv_74_nts_x64 extensionphp_…

基于Vue的高校课程考勤成绩管理系统SpringBoot+nodejs+python

设计目标&#xff1a; 课程管理系统开发的目的是管理全校开设课程的基本信息&#xff0c;安排各班级的课程以及上课时间和教室。系统的使用对象包括教务,学生、教师、管理员等。通过对日常课程管理工作的分析&#xff0c;可以将课程管理系统的功能分为下面几个方面&#xff1a…

C++ //练习10.3 用accumulate求一个vector<int>中的元素之和。

C Primer&#xff08;第5版&#xff09; 练习 10.3 练习10.3 用accumulate求一个vector中的元素之和。 环境&#xff1a;Linux Ubuntu&#xff08;云服务器&#xff09; 工具&#xff1a;vim 代码块 /*******************************************************************…

DataGrip2023配置连接Mssqlserver、Mysql、Oracle若干问题解决方案

1、Mssqlserver连接 本人连的是Sql2008&#xff0c;默认添加时&#xff0c;地址、端口、实例、账号、密码后&#xff0c;测试连接出现错误。 Use SSL&#xff1a;不要勾选 VM option&#xff1a;填写&#xff0c;"-Djdk.tls.disabledAlgorithmsSSLv3, RC4, DES, MD5withR…

(Linux学习二)文件管理基础操作命令笔记

Linux目录结构&#xff1a; bin 二进制文件 boot 启动目录 home 普通用户 root 超管 tmp 临时文件 run 临时运行数据 var 日志 usr 应用程序、文件 etc 配置文件 dev 文件系统 一、基础操作 在 Linux 终端中&#xff0c;你可以使用以下命令来清屏&#xff1a; clear 命令&am…