milvus对象存储和消息中间件的工厂设计模式分析
需求
根据参数设置创建mq和storage
mq有kafka,pulsar
storage有local,minio,remote
配置文件
根据配置文件选择初始化mq和存储:
mq:
type: pulsar
common:
storageType: minio
对于这种类型一个是mq,一个是存储,相比工厂方法设计模式,使用抽象工厂设计模式更合理。
代码框架
工厂接口
代码路径:internal\util\dependency\factory.go
type Factory interface {
msgstream.Factory
// Init()给工厂传递参数。
Init(p *paramtable.ComponentParam)
NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error)
}
// pkg\mq\msgstream\msgstream.go
// msgstream.Factory的code
type Factory interface {
NewMsgStream(ctx context.Context) (MsgStream, error)
NewTtMsgStream(ctx context.Context) (MsgStream, error)
NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}
dependency.Factory是一个工厂接口,里面包含了mq的工厂接口,和创建持久对象的方法。
这个接口创建消息中间件对象和持久存储对象。
这里为什么不这么写:
type Factory interface {
Init(p *paramtable.ComponentParam)
NewMsgStream(ctx context.Context) (MsgStream, error)
NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error)
}
DefaultFactory
DefaultFactory结构体是dependency.Factory的实现。
// DefaultFactory is a factory that produces instances of storage.ChunkManager and message queue.
// internal\util\dependency\factory.go
type DefaultFactory struct {
standAlone bool
chunkManagerFactory storage.Factory
msgStreamFactory msgstream.Factory
}
// storage.Factory
// internal\storage\factory.go
type Factory interface {
NewPersistentStorageChunkManager(ctx context.Context) (ChunkManager, error)
}
// msgstream.Factory
// pkg\mq\msgstream\msgstream.go
type Factory interface {
NewMsgStream(ctx context.Context) (MsgStream, error)
NewTtMsgStream(ctx context.Context) (MsgStream, error)
NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}
DefaultFactory实现了dependency.Factory接口的Init()函数。
在Init()函数内初始化了chunkManagerFactory、msgStreamFactory。
func (f *DefaultFactory) Init(params *paramtable.ComponentParam) {
// skip if using default factory
if f.msgStreamFactory != nil {
return
}
// 初始化chunkManagerFactory
f.chunkManagerFactory = storage.NewChunkManagerFactoryWithParam(params)
// initialize mq client or embedded mq.
// 初始化msgStreamFactory
if err := f.initMQ(f.standAlone, params); err != nil {
panic(err)
}
}
f.chunkManagerFactory:
return &ChunkManagerFactory{
persistentStorage: persistentStorage,
config: c,
}
f.msgStreamFactory:
func (f *DefaultFactory) initMQ(standalone bool, params *paramtable.ComponentParam) error {
mqType := mustSelectMQType(standalone, params.MQCfg.Type.GetValue(), mqEnable{params.RocksmqEnable(), params.NatsmqEnable(), params.PulsarEnable(), params.KafkaEnable()})
log.Info("try to init mq", zap.Bool("standalone", standalone), zap.String("mqType", mqType))
switch mqType {
case mqTypeNatsmq:
f.msgStreamFactory = msgstream.NewNatsmqFactory()
case mqTypeRocksmq:
f.msgStreamFactory = smsgstream.NewRocksmqFactory(params.RocksmqCfg.Path.GetValue(), ¶ms.ServiceParam)
case mqTypePulsar:
f.msgStreamFactory = msgstream.NewPmsFactory(¶ms.ServiceParam)
case mqTypeKafka:
f.msgStreamFactory = msgstream.NewKmsFactory(¶ms.ServiceParam)
}
if f.msgStreamFactory == nil {
return errors.New("failed to create MQ: check the milvus log for initialization failures")
}
return nil
}
持久存储
storage.Factory是创建持久存储的工厂接口。
storage.ChunkManagerFactory是storage.Factory的实现。
NewPersistentStorageChunkManager()接口的实现:
func (f *DefaultFactory) NewPersistentStorageChunkManager(ctx context.Context) (storage.ChunkManager, error) {
return f.chunkManagerFactory.NewPersistentStorageChunkManager(ctx)
}
func (f *ChunkManagerFactory) NewPersistentStorageChunkManager(ctx context.Context) (ChunkManager, error) {
return f.newChunkManager(ctx, f.persistentStorage)
}
func (f *ChunkManagerFactory) newChunkManager(ctx context.Context, engine string) (ChunkManager, error) {
switch engine {
case "local":
return NewLocalChunkManager(RootPath(f.config.rootPath)), nil
case "minio":
return newMinioChunkManagerWithConfig(ctx, f.config)
case "remote":
return NewRemoteChunkManager(ctx, f.config)
default:
return nil, errors.New("no chunk manager implemented with engine: " + engine)
}
}
根据传入的engine新建对应的持久存储对象。
LocalChunkManager、MinioChunkManager、RemoteChunkManager。
// LocalChunkManager is responsible for read and write local file.
type LocalChunkManager struct {
localPath string
}
// MinioChunkManager is responsible for read and write data stored in minio.
type MinioChunkManager struct {
*minio.Client
bucketName string
rootPath string
}
// RemoteChunkManager is responsible for read and write data stored in minio.
type RemoteChunkManager struct {
client ObjectStorage
bucketName string
rootPath string
}
消息中间件
msgstream.Factory是创建mq的工厂接口。
工厂接口:
// pkg\mq\msgstream\msgstream.go
type Factory interface {
NewMsgStream(ctx context.Context) (MsgStream, error)
NewTtMsgStream(ctx context.Context) (MsgStream, error)
NewMsgStreamDisposer(ctx context.Context) func([]string, string) error
}
实现有:
CommonFactory、KmsFactory、PmsFactory
// CommonFactory is a Factory for creating message streams with common logic.
// It contains a function field named newer, which is a function that creates
// an mqwrapper.Client when called.
// pkg\mq\msgstream\common_mq_factory.go
type CommonFactory struct {
Newer func(context.Context) (mqwrapper.Client, error) // client constructor
DispatcherFactory ProtoUDFactory
ReceiveBufSize int64
MQBufSize int64
}
// pkg\mq\msgstream\mq_factory.go
// kafka工厂
type KmsFactory struct {
dispatcherFactory ProtoUDFactory
config *paramtable.KafkaConfig
ReceiveBufSize int64
MQBufSize int64
}
// PmsFactory is a pulsar msgstream factory that implemented Factory interface(msgstream.go)
// pkg\mq\msgstream\mq_factory.go
// pulsar工厂
type PmsFactory struct {
dispatcherFactory ProtoUDFactory
// the following members must be public, so that mapstructure.Decode() can access them
PulsarAddress string
PulsarWebAddress string
ReceiveBufSize int64
MQBufSize int64
PulsarAuthPlugin string
PulsarAuthParams string
PulsarTenant string
PulsarNameSpace string
RequestTimeout time.Duration
metricRegisterer prometheus.Registerer
}
mq产品
mq的产品接口是msgstream.MsgStream
// MsgStream is an interface that can be used to produce and consume message on message queue
type MsgStream interface {
Close()
AsProducer(channels []string)
Produce(*MsgPack) error
SetRepackFunc(repackFunc RepackFunc)
GetProduceChannels() []string
Broadcast(*MsgPack) (map[string][]MessageID, error)
AsConsumer(ctx context.Context, channels []string, subName string, position mqwrapper.SubscriptionInitialPosition) error
Chan() <-chan *MsgPack
Seek(ctx context.Context, offset []*MsgPosition) error
GetLatestMsgID(channel string) (MessageID, error)
CheckTopicValid(channel string) error
EnableProduce(can bool)
}
具体产品实现有:
msgstream.mqMsgStream、msgstream.MqTtMsgStream
type mqMsgStream struct {
ctx context.Context
client mqwrapper.Client
producers map[string]mqwrapper.Producer
producerChannels []string
consumers map[string]mqwrapper.Consumer
consumerChannels []string
repackFunc RepackFunc
unmarshal UnmarshalDispatcher
receiveBuf chan *MsgPack
closeRWMutex *sync.RWMutex
streamCancel func()
bufSize int64
producerLock *sync.RWMutex
consumerLock *sync.Mutex
closed int32
onceChan sync.Once
enableProduce atomic.Value
}
// MqTtMsgStream is a msgstream that contains timeticks
type MqTtMsgStream struct {
*mqMsgStream
chanMsgBuf map[mqwrapper.Consumer][]TsMsg
chanMsgPos map[mqwrapper.Consumer]*msgpb.MsgPosition
chanStopChan map[mqwrapper.Consumer]chan bool
chanTtMsgTime map[mqwrapper.Consumer]Timestamp
chanMsgBufMutex *sync.Mutex
chanTtMsgTimeMutex *sync.RWMutex
chanWaitGroup *sync.WaitGroup
lastTimeStamp Timestamp
syncConsumer chan int
}
存储产品
存储的产品接口是storag.ChunkManagere
// ChunkManager is to manager chunks.
// Include Read, Write, Remove chunks.
type ChunkManager interface {
// RootPath returns current root path.
RootPath() string
// Path returns path of @filePath.
Path(ctx context.Context, filePath string) (string, error)
// Size returns path of @filePath.
Size(ctx context.Context, filePath string) (int64, error)
// Write writes @content to @filePath.
Write(ctx context.Context, filePath string, content []byte) error
// MultiWrite writes multi @content to @filePath.
MultiWrite(ctx context.Context, contents map[string][]byte) error
// Exist returns true if @filePath exists.
Exist(ctx context.Context, filePath string) (bool, error)
// Read reads @filePath and returns content.
Read(ctx context.Context, filePath string) ([]byte, error)
// Reader return a reader for @filePath
Reader(ctx context.Context, filePath string) (FileReader, error)
// MultiRead reads @filePath and returns content.
MultiRead(ctx context.Context, filePaths []string) ([][]byte, error)
ListWithPrefix(ctx context.Context, prefix string, recursive bool) ([]string, []time.Time, error)
// ReadWithPrefix reads files with same @prefix and returns contents.
ReadWithPrefix(ctx context.Context, prefix string) ([]string, [][]byte, error)
Mmap(ctx context.Context, filePath string) (*mmap.ReaderAt, error)
// ReadAt reads @filePath by offset @off, content stored in @p, return @n as the number of bytes read.
// if all bytes are read, @err is io.EOF.
// return other error if read failed.
ReadAt(ctx context.Context, filePath string, off int64, length int64) (p []byte, err error)
// Remove delete @filePath.
Remove(ctx context.Context, filePath string) error
// MultiRemove delete @filePaths.
MultiRemove(ctx context.Context, filePaths []string) error
// RemoveWithPrefix remove files with same @prefix.
RemoveWithPrefix(ctx context.Context, prefix string) error
}
具体产品实现有:
LocalChunkManager、MinioChunkManager、RemoteChunkManager
// LocalChunkManager is responsible for read and write local file.
type LocalChunkManager struct {
localPath string
}
// MinioChunkManager is responsible for read and write data stored in minio.
type MinioChunkManager struct {
*minio.Client
// ctx context.Context
bucketName string
rootPath string
}
// RemoteChunkManager is responsible for read and write data stored in minio.
type RemoteChunkManager struct {
client ObjectStorage
// ctx context.Context
bucketName string
rootPath string
}
总结
从代码框架可以看出每一种mq都有一个工厂,存储只有一个工厂