milvus对象存储和消息中间件的工厂设计模式分析

milvus对象存储和消息中间件的工厂设计模式分析

需求

根据参数设置创建mq和storage
mq有kafka,pulsar
storage有local,minio,remote

配置文件

根据配置文件选择初始化mq和存储:

mq:
  type: pulsar
  
common:
  storageType: minio

对于这种类型一个是mq,一个是存储,相比工厂方法设计模式,使用抽象工厂设计模式更合理。

代码框架

在这里插入图片描述

工厂接口

代码路径:internal\util\dependency\factory.go

type Factory interface {
	msgstream.Factory
    // Init()给工厂传递参数。
	Init(p *paramtable.ComponentParam)
	NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error)
}

// pkg\mq\msgstream\msgstream.go
// msgstream.Factory的code
type Factory interface {
	NewMsgStream(ctx context.Context) (MsgStream, error)
	NewTtMsgStream(ctx context.Context) (MsgStream, error)
	NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

dependency.Factory是一个工厂接口,里面包含了mq的工厂接口,和创建持久对象的方法。

这个接口创建消息中间件对象和持久存储对象。

这里为什么不这么写:

type Factory interface {
    Init(p *paramtable.ComponentParam)
	NewMsgStream(ctx context.Context) (MsgStream, error)
	NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error)
}

DefaultFactory

DefaultFactory结构体是dependency.Factory的实现。

// DefaultFactory is a factory that produces instances of storage.ChunkManager and message queue.
// internal\util\dependency\factory.go
type DefaultFactory struct {
	standAlone          bool
	chunkManagerFactory storage.Factory
	msgStreamFactory    msgstream.Factory
}

// storage.Factory
// internal\storage\factory.go
type Factory interface {
	NewPersistentStorageChunkManager(ctx context.Context) (ChunkManager, error)
}

// msgstream.Factory
// pkg\mq\msgstream\msgstream.go
type Factory interface {
	NewMsgStream(ctx context.Context) (MsgStream, error)
	NewTtMsgStream(ctx context.Context) (MsgStream, error)
	NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

DefaultFactory实现了dependency.Factory接口的Init()函数。

在Init()函数内初始化了chunkManagerFactory、msgStreamFactory。

func (f *DefaultFactory) Init(params *paramtable.ComponentParam) {
	// skip if using default factory
	if f.msgStreamFactory != nil {
		return
	}
    // 初始化chunkManagerFactory
	f.chunkManagerFactory = storage.NewChunkManagerFactoryWithParam(params)

	// initialize mq client or embedded mq.
    // 初始化msgStreamFactory
	if err := f.initMQ(f.standAlone, params); err != nil {
		panic(err)
	}
}

f.chunkManagerFactory:

return &ChunkManagerFactory{
		persistentStorage: persistentStorage,
		config:            c,
	}

f.msgStreamFactory:

func (f *DefaultFactory) initMQ(standalone bool, params *paramtable.ComponentParam) error {
	mqType := mustSelectMQType(standalone, params.MQCfg.Type.GetValue(), mqEnable{params.RocksmqEnable(), params.NatsmqEnable(), params.PulsarEnable(), params.KafkaEnable()})
	log.Info("try to init mq", zap.Bool("standalone", standalone), zap.String("mqType", mqType))

	switch mqType {
	case mqTypeNatsmq:
		f.msgStreamFactory = msgstream.NewNatsmqFactory()
	case mqTypeRocksmq:
		f.msgStreamFactory = smsgstream.NewRocksmqFactory(params.RocksmqCfg.Path.GetValue(), &params.ServiceParam)
	case mqTypePulsar:
		f.msgStreamFactory = msgstream.NewPmsFactory(&params.ServiceParam)
	case mqTypeKafka:
		f.msgStreamFactory = msgstream.NewKmsFactory(&params.ServiceParam)
	}
	if f.msgStreamFactory == nil {
		return errors.New("failed to create MQ: check the milvus log for initialization failures")
	}
	return nil
}

持久存储

storage.Factory是创建持久存储的工厂接口。

storage.ChunkManagerFactory是storage.Factory的实现。

NewPersistentStorageChunkManager()接口的实现:

func (f *DefaultFactory) NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error) {
	return f.chunkManagerFactory.NewPersistentStorageChunkManager(ctx)
}

func (f *ChunkManagerFactory) NewPersistentStorageChunkManager(ctx context.Context) (ChunkManager, error) {
	return f.newChunkManager(ctx, f.persistentStorage)
}

func (f *ChunkManagerFactory) newChunkManager(ctx context.Context, engine string) (ChunkManager, error) {
	switch engine {
	case "local":
		return NewLocalChunkManager(RootPath(f.config.rootPath)), nil
	case "minio":
		return newMinioChunkManagerWithConfig(ctx, f.config)
	case "remote":
		return NewRemoteChunkManager(ctx, f.config)
	default:
		return nil, errors.New("no chunk manager implemented with engine: " + engine)
	}
}

根据传入的engine新建对应的持久存储对象。

LocalChunkManager、MinioChunkManager、RemoteChunkManager。

// LocalChunkManager is responsible for read and write local file.
type LocalChunkManager struct {
	localPath string
}

// MinioChunkManager is responsible for read and write data stored in minio.
type MinioChunkManager struct {
	*minio.Client
	bucketName string
	rootPath   string
}

// RemoteChunkManager is responsible for read and write data stored in minio.
type RemoteChunkManager struct {
	client ObjectStorage
	bucketName string
	rootPath   string
}

消息中间件

msgstream.Factory是创建mq的工厂接口。

工厂接口:

// pkg\mq\msgstream\msgstream.go
type Factory interface {
	NewMsgStream(ctx context.Context) (MsgStream, error)
	NewTtMsgStream(ctx context.Context) (MsgStream, error)
	NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}

实现有:

CommonFactory、KmsFactory、PmsFactory

// CommonFactory is a Factory for creating message streams with common logic.
// It contains a function field named newer, which is a function that creates
// an mqwrapper.Client when called.
// pkg\mq\msgstream\common_mq_factory.go
type CommonFactory struct {
	Newer             func(context.Context) (mqwrapper.Client, error) // client constructor
	DispatcherFactory ProtoUDFactory
	ReceiveBufSize    int64
	MQBufSize         int64
}

// pkg\mq\msgstream\mq_factory.go
// kafka工厂
type KmsFactory struct {
	dispatcherFactory ProtoUDFactory
	config            *paramtable.KafkaConfig
	ReceiveBufSize    int64
	MQBufSize         int64
}

// PmsFactory is a pulsar msgstream factory that implemented Factory interface(msgstream.go)
// pkg\mq\msgstream\mq_factory.go
// pulsar工厂
type PmsFactory struct {
	dispatcherFactory ProtoUDFactory
	// the following members must be public, so that mapstructure.Decode() can access them
	PulsarAddress    string
	PulsarWebAddress string
	ReceiveBufSize   int64
	MQBufSize        int64
	PulsarAuthPlugin string
	PulsarAuthParams string
	PulsarTenant     string
	PulsarNameSpace  string
	RequestTimeout   time.Duration
	metricRegisterer prometheus.Registerer
}

mq产品

mq的产品接口是msgstream.MsgStream

// MsgStream is an interface that can be used to produce and consume message on message queue
type MsgStream interface {
	Close()

	AsProducer(channels []string)
	Produce(*MsgPack) error
	SetRepackFunc(repackFunc RepackFunc)
	GetProduceChannels() []string
	Broadcast(*MsgPack) (map[string][]MessageID, error)

	AsConsumer(ctx context.Context, channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) error
	Chan() <-chan *MsgPack
	Seek(ctx context.Context, offset []*MsgPosition) error

	GetLatestMsgID(channel string) (MessageID, error)
	CheckTopicValid(channel string) error

	EnableProduce(can bool)
}

具体产品实现有:

msgstream.mqMsgStream、msgstream.MqTtMsgStream

type mqMsgStream struct {
	ctx              context.Context
	client           mqwrapper.Client
	producers        map[string]mqwrapper.Producer
	producerChannels []string
	consumers        map[string]mqwrapper.Consumer
	consumerChannels []string

	repackFunc    RepackFunc
	unmarshal     UnmarshalDispatcher
	receiveBuf    chan *MsgPack
	closeRWMutex  *sync.RWMutex
	streamCancel  func()
	bufSize       int64
	producerLock  *sync.RWMutex
	consumerLock  *sync.Mutex
	closed        int32
	onceChan      sync.Once
	enableProduce atomic.Value
}

// MqTtMsgStream is a msgstream that contains timeticks
type MqTtMsgStream struct {
	*mqMsgStream
	chanMsgBuf         map[mqwrapper.Consumer][]TsMsg
	chanMsgPos         map[mqwrapper.Consumer]*msgpb.MsgPosition
	chanStopChan       map[mqwrapper.Consumer]chan bool
	chanTtMsgTime      map[mqwrapper.Consumer]Timestamp
	chanMsgBufMutex    *sync.Mutex
	chanTtMsgTimeMutex *sync.RWMutex
	chanWaitGroup      *sync.WaitGroup
	lastTimeStamp      Timestamp
	syncConsumer       chan int
}

存储产品

存储的产品接口是storag.ChunkManagere

// ChunkManager is to manager chunks.
// Include Read, Write, Remove chunks.
type ChunkManager interface {
	// RootPath returns current root path.
	RootPath() string
	// Path returns path of @filePath.
	Path(ctx context.Context, filePath string) (string, error)
	// Size returns path of @filePath.
	Size(ctx context.Context, filePath string) (int64, error)
	// Write writes @content to @filePath.
	Write(ctx context.Context, filePath string, content []byte) error
	// MultiWrite writes multi @content to @filePath.
	MultiWrite(ctx context.Context, contents map[string][]byte) error
	// Exist returns true if @filePath exists.
	Exist(ctx context.Context, filePath string) (bool, error)
	// Read reads @filePath and returns content.
	Read(ctx context.Context, filePath string) ([]byte, error)
	// Reader return a reader for @filePath
	Reader(ctx context.Context, filePath string) (FileReader, error)
	// MultiRead reads @filePath and returns content.
	MultiRead(ctx context.Context, filePaths []string) ([][]byte, error)
	ListWithPrefix(ctx context.Context, prefix string, recursive bool) ([]string, []time.Time, error)
	// ReadWithPrefix reads files with same @prefix and returns contents.
	ReadWithPrefix(ctx context.Context, prefix string) ([]string, [][]byte, error)
	Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error)
	// ReadAt reads @filePath by offset @off, content stored in @p, return @n as the number of bytes read.
	// if all bytes are read, @err is io.EOF.
	// return other error if read failed.
	ReadAt(ctx context.Context, filePath string, off int64, length int64) (p []byte, err error)
	// Remove delete @filePath.
	Remove(ctx context.Context, filePath string) error
	// MultiRemove delete @filePaths.
	MultiRemove(ctx context.Context, filePaths []string) error
	// RemoveWithPrefix remove files with same @prefix.
	RemoveWithPrefix(ctx context.Context, prefix string) error
}

具体产品实现有:

LocalChunkManager、MinioChunkManager、RemoteChunkManager

// LocalChunkManager is responsible for read and write local file.
type LocalChunkManager struct {
	localPath string
}

// MinioChunkManager is responsible for read and write data stored in minio.
type MinioChunkManager struct {
	*minio.Client

	//	ctx        context.Context
	bucketName string
	rootPath   string
}

// RemoteChunkManager is responsible for read and write data stored in minio.
type RemoteChunkManager struct {
	client ObjectStorage

	//	ctx        context.Context
	bucketName string
	rootPath   string
}

总结

从代码框架可以看出每一种mq都有一个工厂,存储只有一个工厂

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/576087.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

kubernetes部署控制器Deployment

一、概念 在学习rc和rs控制器资源时&#xff0c;这两个资源都是控制pod的副本数量的&#xff0c;但是&#xff0c;他们两个有个缺点&#xff0c;就是在部署新版本pod或者回滚代码的时候&#xff0c;需要先apply资源清单&#xff0c;然后再删除现有pod&#xff0c;通过资源控制&…

接口测试和Mock学习路线(上)

一、接口测试和Mock学习路线-第一阶段&#xff1a; 掌握接口测试的知识体系与学习路线掌握面试常见知识点之 HTTP 协议掌握常用接口测试工具 Postman掌握常用抓包工具 Charles 与 Fiddler结合知名产品实现 mock 测试与接口测试实战练习 1.接口协议&#xff1a; 需要先了解 O…

Vue3 + Element-Plus 对接高德地图实现搜索提示选址、点击地图选址、自我定位功能(最新)

Vue3 Element-Plus 对接高德地图实现搜索提示选址、点击地图选址、自我定位功能&#xff08;最新&#xff09; 1、效果展示2、实现代码2.1 GaoDeMap.vue2.2 SystemDialog.vue2.3 UnusedList.vue.vue 1、效果展示 2、实现代码 2.1 GaoDeMap.vue <template><div style…

【个人博客搭建】(11)swagger添加jwt信息

这个主要是为了方便使用swagger时&#xff0c;能更好的带入我们的token。 ps&#xff1a;如果使用其他第三方api工具&#xff08;apipost、postman等&#xff09;则不需要。 &#xff08;当然&#xff0c;不用不能没有&#xff0c;是吧&#xff09; 1、在AddSwaggerGen内添加…

文件权限管理

文件权限管理 1. 权限对象 权限对象含义u属主&#xff0c;所有者g属组o其他人 2. 权限类型 权限类型含义值r读权限4w写权限2x执行权限1 3. 修改文件属主及属组 命令:chown(change own)更改文件或目录属主与属组名 3.1 修改文件属主与属组 只修改属主&#xff1a;chown $…

【数据结构】链表的中间节点

给你单链表的头结点 head &#xff0c;请你找出并返回链表的中间结点。 如果有两个中间结点&#xff0c;则返回第二个中间结点。 Definition for singly-linked list.struct ListNode {int val;struct ListNode *next;};typedef struct ListNode ListNode; struct ListNode…

美国服务器vs香港服务器,哪个网站部署打开更快一些?

网站打开速度受多种因素影响&#xff0c;包括服务器地理位置、网络质量、带宽等。用户距离服务器越近&#xff0c;访问速度越快。对于中国大陆用户而言&#xff0c;香港的服务器可能会提供更快的网站访问体验&#xff0c;因为香港距离大陆较近&#xff0c;且网络连接通常较好。…

python高阶函数:zip()

概述与基本用法 zip() 是 Python 内置函数之一&#xff0c;用于将多个可迭代对象打包成一个元组序列&#xff0c;然后返回一个迭代器。它可以接受任意数量的可迭代对象作为参数&#xff0c;并将它们的元素按顺序一一对应地打包成元组。 以下是 zip() 函数的基本用法&#xff…

2024年视频号小店来了,这次是不是新的电商风口?

大家好&#xff0c;我是电商糖果 2024年电商行业可以说大地震了&#xff0c;为什么这么说呢&#xff1f; 因为一个非常有实力的新平台出现了。 它就是微信视频号推出的视频号小店&#xff0c;也可以理解为腾讯旗下的电商平台。 视频号的出现是腾讯为了对标抖音&#xff0c;和…

使用LSTM网络实现文本情感分析

一、实验目的&#xff1a; 理解循环神经网络的基本概念和原理&#xff1b;了解循环神经网络处理文本数据的基本方法&#xff1b;掌握循环神经网络处理文本数据的实践方法&#xff0c;并实现文本情感分析任务。 实验要求&#xff1a; 使用Keras框架定义并训练循环神经网络模型…

链游:未来游戏发展的新风向

链游&#xff0c;即区块链游戏的一种&#xff0c;是一种将区块链技术与游戏玩法相结合的创新型游戏。它利用区块链技术的特性&#xff0c;如去中心化、可追溯性和安全性&#xff0c;为玩家提供了一种全新的游戏体验。链游通常采用智能合约来实现游戏的规则和交易系统&#xff0…

B站无限评论暴力截留协议及教程

B站无限评论暴力截留协议及教程 B站无限评论暴力截留协议及教程&#xff0c;需要抓CK &#xff0c;教程里面有讲如何抓取 网盘自动获取 链接&#xff1a;https://pan.baidu.com/s/1lpzKPim76qettahxvxtjaQ?pwd0b8x 提取码&#xff1a;0b8x

森林消防隔膜泵的应用与前景——恒峰智慧科技

随着全球气候变暖&#xff0c;森林火灾频发&#xff0c;给生态环境和人类安全带来严重威胁。为有效应对这一挑战&#xff0c;森林消防领域不断引入新技术、新装备。其中&#xff0c;隔膜泵作为一种高效、可靠的消防设备&#xff0c;正逐渐受到广泛关注。本文将探讨森林消防隔膜…

c++在visual studio上的默认配置

右键 新建项 右键源文件 属性

5、Flink事件时间之Watermark详解

1&#xff09;生成 Watermark 1.Watermark 策略简介 为了使用事件时间语义&#xff0c;Flink 应用程序需要知道事件时间戳对应的字段&#xff0c;即数据流中的每个元素都需要拥有可分配的事件时间戳。 通过使用 TimestampAssigner API 从元素中的某个字段去访问/提取时间戳。…

Gitflow实操以及代码审查Pull Request操作

1.背景 之前一直有用过gitflow&#xff0c;但是一直没有归纳技术&#xff0c;另一方面也是每个团队用到的gitflow都不一致。而最近做项目要用gitflow&#xff0c;趁此机会分享一下gitflow的操作。 2.gitflow介绍 用git一直有一个问题&#xff0c;就是怎么保证代码稳定性&…

LeetCode in Python 48. Rotate Image/Matrix (旋转图像/矩阵)

旋转图像/矩阵的重点是寻找旋转前后对应位置的坐标关系。 示例&#xff1a; 图1 旋转图像/矩阵的输入输出示意图 代码&#xff1a; class Solution:def rotate(self, matrix):n len(matrix)for i in range(n // 2):for j in range(i, n - 1 - i):topleft matrix[i][j]ma…

ArcGIS小技巧——由图片创建点符号

一张合格的专题地图&#xff0c;除了内容的准确性和丰富性以外&#xff0c;美观性也是必不可少的。而Arcgis符号库中的符号非常有限&#xff0c;有时并不能很好的展现出地图要素的特点。因此&#xff0c;学会自定义符号就显得尤其重要了。今天&#xff0c;小编将结合实例&#…

AI大模型探索之路-训练篇4:大语言模型训练数据集概览

文章目录 前言一、常用的预训练数据集1、网页2、书籍3、维基百科4、代码5、混合型数据集 二、常用微调数据集1、指令微调数据集1.1 自然语言处理任务数据集1.2 日常对话数据集1.3 合成数据集 2、人类对齐数据集 前言 在人工智能领域&#xff0c;构建强大的AI系统的关键步骤之一…

synchronized 之谜

序言 本文给大家介绍一下 synchronized 关键字的部分原理。 一、内存中的 Java 对象 class A {private String attr; }先引入一个问题&#xff1a;上面类 A 有一个属性 attr。当类 A 实例化之后的对象在内存中是如何表示的呢&#xff1f; 在内存中&#xff0c;Java 对象由三…