milvus版本:v2.3.2
整体架构:
Upsert 的数据流向:
1.客户端sdk发出Upsert API请求。
import numpy as np
from pymilvus import (
connections,
Collection,
)
num_entities, dim = 4, 3
print("start connecting to Milvus")
connections.connect("default", host="192.168.230.71", port="19530")
hello_milvus = Collection("hello_milvus")
print("Start upsert entities")
rng = np.random.default_rng(seed=19530)
entities = [
[0,1,2,4000],
[10,11,12,4000],
rng.random((num_entities, dim)),
]
hello_milvus.upsert(entities)
2.服务端接受API请求,将request封装为upsertTask,并压入dmQueue队列。
注意这里是dmQueue。DDL类型的是ddQueue。
代码路径:internal\proxy\impl.go
// Upsert upsert records into collection.
func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) (*milvuspb.MutationResult, error) {
......
// request封装为upsertTask
it := &upsertTask{
baseMsg: msgstream.BaseMsg{
HashValues: request.HashKeys,
},
ctx: ctx,
Condition: NewTaskCondition(ctx),
req: request,
result: &milvuspb.MutationResult{
Status: merr.Success(),
IDs: &schemapb.IDs{
IdField: nil,
},
},
idAllocator: node.rowIDAllocator,
segIDAssigner: node.segAssigner,
chMgr: node.chMgr,
chTicker: node.chTicker,
}
......
// 将task压入dmQueue队列
if err := node.sched.dmQueue.Enqueue(it); err != nil {
......
}
......
// 等待任务执行完
if err := it.WaitToFinish(); err != nil {
......
}
......
}
3.执行upsertTask的3个方法PreExecute、Execute、PostExecute。
PreExecute()一般为参数校验等工作。
Execute()为真正执行逻辑。
PostExecute()执行完后的逻辑,什么都不做,返回nil。
代码路径:internal\proxy\task_upsert.go
func (it *upsertTask) Execute(ctx context.Context) (err error) {
ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Upsert-Execute")
defer sp.End()
log := log.Ctx(ctx).With(zap.String("collectionName", it.req.CollectionName))
tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute upsert %d", it.ID()))
// 拿到stream,类型为msgstream.mqMsgStream
stream, err := it.chMgr.getOrCreateDmlStream(it.collectionID)
if err != nil {
return err
}
// 创建msgPack
msgPack := &msgstream.MsgPack{
BeginTs: it.BeginTs(),
EndTs: it.EndTs(),
}
// 添加insertMsgPack
err = it.insertExecute(ctx, msgPack)
if err != nil {
log.Warn("Fail to insertExecute", zap.Error(err))
return err
}
// 添加deleteMsgPack
err = it.deleteExecute(ctx, msgPack)
if err != nil {
log.Warn("Fail to deleteExecute", zap.Error(err))
return err
}
tr.RecordSpan()
// 发送数据至mq
err = stream.Produce(msgPack)
if err != nil {
it.result.Status = merr.Status(err)
return err
}
sendMsgDur := tr.RecordSpan()
metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel).Observe(float64(sendMsgDur.Milliseconds()))
totalDur := tr.ElapseSpan()
log.Debug("Proxy Upsert Execute done", zap.Int64("taskID", it.ID()),
zap.Duration("total duration", totalDur))
return nil
}
msgPack变量:
msgPack包含了insertRequest和deleteRequest。
insertRequest包含了客户端的upsert数据,以及还会有rowid,用来唯一标识一列数据。
deleteRequest包含主键值。