一、背景
大型的应用程序为了后期的排障、运营等,会将一些请求、日志、性能指标等数据保存到存储系统中。为了满足这些需求,我们需要进行数据采集,将数据高效的传输到存储系统
二、问题
- 采集服务仅仅针对某个需求开发,需要修改业务代码逻辑,会给业务带来比较大的负担,并且耦合度太高
- 数据采集导致已有的服务请求延时变高
- 采集性能差,需要较长的时间才能采集完一批数据
- 服务关闭时会丢失数据
三、解决方案
- 针对问题1,我们可以将数据采集从业务服务中解耦出来,专门创建一个数据采集服务。业务程序只需要将数据传输到指定的中间件,至于数据的处理、采样、过滤、传出等逻辑都在采集服务中完成。
- 针对问题2,将数据的导出由同步改为异步,异步开启多个协程消费通道中的数据,这样对程序的性能几乎微乎其微
- 针对问题3,设置好采集最长间隔、批量采集大小以及使用高性能的数据中间件作为中转,比如redis、kafka
- 针对问题4,为导出器和采集服务增加关闭监听,程序关闭时将数据清空完再退出
四、采集服务实现
4.1 架构设计
4.2 storage
负责从数据中间件中拿到数据,接口定义如下:
type AnalyticsStorage interface {
Init(config interface{}) error
GetName() string
Connect() bool
GetAndDeleteSet(string) []interface{}
}
redis的storage实现
import (
"crypto/tls"
"strconv"
"time"
redis "github.com/go-redis/redis/v7"
"github.com/marmotedu/errors"
"github.com/mitchellh/mapstructure"
genericoptions "github.com/marmotedu/iam/internal/pkg/options"
"github.com/marmotedu/iam/pkg/log"
)
// ------------------- REDIS CLUSTER STORAGE MANAGER -------------------------------
// RedisKeyPrefix defines prefix for iam analytics key.
const (
RedisKeyPrefix = "analytics-"
defaultRedisAddress = "127.0.0.1:6379"
)
var redisClusterSingleton redis.UniversalClient
// RedisClusterStorageManager is a storage manager that uses the redis database.
type RedisClusterStorageManager struct {
db redis.UniversalClient
KeyPrefix string
HashKeys bool
Config genericoptions.RedisOptions
}
// NewRedisClusterPool returns a redis cluster client.
func NewRedisClusterPool(forceReconnect bool, config genericoptions.RedisOptions) redis.UniversalClient {
if !forceReconnect {
if redisClusterSingleton != nil {
log.Debug("Redis pool already INITIALIZED")
return redisClusterSingleton
}
} else {
if redisClusterSingleton != nil {
redisClusterSingleton.Close()
}
}
log.Debug("Creating new Redis connection pool")
maxActive := 500
if config.MaxActive > 0 {
maxActive = config.MaxActive
}
timeout := 5 * time.Second
if config.Timeout > 0 {
timeout = time.Duration(config.Timeout) * time.Second
}
var tlsConfig *tls.Config
if config.UseSSL {
tlsConfig = &tls.Config{
InsecureSkipVerify: config.SSLInsecureSkipVerify,
}
}
var client redis.UniversalClient
opts := &RedisOpts{
MasterName: config.MasterName,
Addrs: getRedisAddrs(config),
DB: config.Database,
Password: config.Password,
PoolSize: maxActive,
IdleTimeout: 240 * time.Second,
ReadTimeout: timeout,
WriteTimeout: timeout,
DialTimeout: timeout,
TLSConfig: tlsConfig,
}
if opts.MasterName != "" {
log.Info("--> [REDIS] Creating sentinel-backed failover client")
client = redis.NewFailoverClient(opts.failover())
} else if config.EnableCluster {
log.Info("--> [REDIS] Creating cluster client")
client = redis.NewClusterClient(opts.cluster())
} else {
log.Info("--> [REDIS] Creating single-node client")
client = redis.NewClient(opts.simple())
}
redisClusterSingleton = client
return client
}
func getRedisAddrs(config genericoptions.RedisOptions) (addrs []string) {
if len(config.Addrs) != 0 {
addrs = config.Addrs
}
if len(addrs) == 0 && config.Port != 0 {
addr := config.Host + ":" + strconv.Itoa(config.Port)
addrs = append(addrs, addr)
}
return addrs
}
// RedisOpts is the overridden type of redis.UniversalOptions. simple() and cluster() functions are not public
// in redis library. Therefore, they are redefined in here to use in creation of new redis cluster logic.
// We don't want to use redis.NewUniversalClient() logic.
type RedisOpts redis.UniversalOptions
func (o *RedisOpts) cluster() *redis.ClusterOptions {
if len(o.Addrs) == 0 {
o.Addrs = []string{defaultRedisAddress}
}
return &redis.ClusterOptions{
Addrs: o.Addrs,
OnConnect: o.OnConnect,
Password: o.Password,
MaxRedirects: o.MaxRedirects,
ReadOnly: o.ReadOnly,
RouteByLatency: o.RouteByLatency,
RouteRandomly: o.RouteRandomly,
MaxRetries: o.MaxRetries,
MinRetryBackoff: o.MinRetryBackoff,
MaxRetryBackoff: o.MaxRetryBackoff,
DialTimeout: o.DialTimeout,
ReadTimeout: o.ReadTimeout,
WriteTimeout: o.WriteTimeout,
PoolSize: o.PoolSize,
MinIdleConns: o.MinIdleConns,
MaxConnAge: o.MaxConnAge,
PoolTimeout: o.PoolTimeout,
IdleTimeout: o.IdleTimeout,
IdleCheckFrequency: o.IdleCheckFrequency,
TLSConfig: o.TLSConfig,
}
}
func (o *RedisOpts) simple() *redis.Options {
addr := defaultRedisAddress
if len(o.Addrs) > 0 {
addr = o.Addrs[0]
}
return &redis.Options{
Addr: addr,
OnConnect: o.OnConnect,
DB: o.DB,
Password: o.Password,
MaxRetries: o.MaxRetries,
MinRetryBackoff: o.MinRetryBackoff,
MaxRetryBackoff: o.MaxRetryBackoff,
DialTimeout: o.DialTimeout,
ReadTimeout: o.ReadTimeout,
WriteTimeout: o.WriteTimeout,
PoolSize: o.PoolSize,
MinIdleConns: o.MinIdleConns,
MaxConnAge: o.MaxConnAge,
PoolTimeout: o.PoolTimeout,
IdleTimeout: o.IdleTimeout,
IdleCheckFrequency: o.IdleCheckFrequency,
TLSConfig: o.TLSConfig,
}
}
func (o *RedisOpts) failover() *redis.FailoverOptions {
if len(o.Addrs) == 0 {
o.Addrs = []string{"127.0.0.1:26379"}
}
return &redis.FailoverOptions{
SentinelAddrs: o.Addrs,
MasterName: o.MasterName,
OnConnect: o.OnConnect,
DB: o.DB,
Password: o.Password,
MaxRetries: o.MaxRetries,
MinRetryBackoff: o.MinRetryBackoff,
MaxRetryBackoff: o.MaxRetryBackoff,
DialTimeout: o.DialTimeout,
ReadTimeout: o.ReadTimeout,
WriteTimeout: o.WriteTimeout,
PoolSize: o.PoolSize,
MinIdleConns: o.MinIdleConns,
MaxConnAge: o.MaxConnAge,
PoolTimeout: o.PoolTimeout,
IdleTimeout: o.IdleTimeout,
IdleCheckFrequency: o.IdleCheckFrequency,
TLSConfig: o.TLSConfig,
}
}
// GetName returns the redis cluster storage manager name.
func (r *RedisClusterStorageManager) GetName() string {
return "redis"
}
// Init initialize the redis cluster storage manager.
func (r *RedisClusterStorageManager) Init(config interface{}) error {
r.Config = genericoptions.RedisOptions{}
err := mapstructure.Decode(config, &r.Config)
if err != nil {
log.Fatalf("Failed to decode configuration: %s", err.Error())
}
r.KeyPrefix = RedisKeyPrefix
return nil
}
// Connect will establish a connection to the r.db.
func (r *RedisClusterStorageManager) Connect() bool {
if r.db == nil {
log.Debug("Connecting to redis cluster")
r.db = NewRedisClusterPool(false, r.Config)
return true
}
log.Debug("Storage Engine already initialized...")
// Reset it just in case
r.db = redisClusterSingleton
return true
}
func (r *RedisClusterStorageManager) hashKey(in string) string {
return in
}
func (r *RedisClusterStorageManager) fixKey(keyName string) string {
setKeyName := r.KeyPrefix + r.hashKey(keyName)
log.Debugf("Input key was: %s", setKeyName)
return setKeyName
}
// GetAndDeleteSet get and delete key from redis.
func (r *RedisClusterStorageManager) GetAndDeleteSet(keyName string) []interface{} {
log.Debugf("Getting raw key set: %s", keyName)
if r.db == nil {
log.Warn("Connection dropped, connecting..")
r.Connect()
return r.GetAndDeleteSet(keyName)
}
log.Debugf("keyName is: %s", keyName)
fixedKey := r.fixKey(keyName)
log.Debugf("Fixed keyname is: %s", fixedKey)
var lrange *redis.StringSliceCmd
_, err := r.db.TxPipelined(func(pipe redis.Pipeliner) error {
lrange = pipe.LRange(fixedKey, 0, -1)
pipe.Del(fixedKey)
return nil
})
if err != nil {
log.Errorf("Multi command failed: %s", err)
r.Connect()
}
vals := lrange.Val()
result := make([]interface{}, len(vals))
for i, v := range vals {
result[i] = v
}
log.Debugf("Unpacked vals: %d", len(result))
return result
}
// SetKey will create (or update) a key value in the store.
func (r *RedisClusterStorageManager) SetKey(keyName, session string, timeout int64) error {
log.Debugf("[STORE] SET Raw key is: %s", keyName)
log.Debugf("[STORE] Setting key: %s", r.fixKey(keyName))
r.ensureConnection()
err := r.db.Set(r.fixKey(keyName), session, 0).Err()
if timeout > 0 {
if expErr := r.SetExp(keyName, timeout); expErr != nil {
return expErr
}
}
if err != nil {
log.Errorf("Error trying to set value: %s", err.Error())
return errors.Wrap(err, "failed to set key")
}
return nil
}
// SetExp is used to set the expiry of a key.
func (r *RedisClusterStorageManager) SetExp(keyName string, timeout int64) error {
err := r.db.Expire(r.fixKey(keyName), time.Duration(timeout)*time.Second).Err()
if err != nil {
log.Errorf("Could not EXPIRE key: %s", err.Error())
}
return errors.Wrap(err, "failed to set expire time for key")
}
func (r *RedisClusterStorageManager) ensureConnection() {
if r.db != nil {
// already connected
return
}
log.Info("Connection dropped, reconnecting...")
for {
r.Connect()
if r.db != nil {
// reconnection worked
return
}
log.Info("Reconnecting again...")
}
}
4.3 pump
我们首先会针对某种业务创建对应的数据结构,比如
type AnalyticsRecord struct {
TimeStamp int64 `json:"timestamp"`
Username string `json:"username"`
Effect string `json:"effect"`
Conclusion string `json:"conclusion"`
Request string `json:"request"`
Policies string `json:"policies"`
Deciders string `json:"deciders"`
ExpireAt time.Time `json:"expireAt" bson:"expireAt"`
}
pump负责将数据导出到指定的数据存储系统,比如promethus、mongo、ES等,它的接口定义如下:
type Pump interface {
GetName() string
New() Pump
Init(interface{}) error
WriteData(context.Context, []interface{}) error
SetTimeout(timeout int)
GetTimeout() int
}
4.4 exporter
exporter依赖storage和pump,每个exporter负责一种业务,一般对应一种数据结构
type AnalyticsRecord struct {
TimeStamp int64 `json:"timestamp"`
Username string `json:"username"`
Effect string `json:"effect"`
Conclusion string `json:"conclusion"`
Request string `json:"request"`
Policies string `json:"policies"`
Deciders string `json:"deciders"`
ExpireAt time.Time `json:"expireAt" bson:"expireAt"`
}
exporter从storage中取出数据转成对应的数据结构,并进行过滤和去掉冗余字段内容。最后将数据通过pump导出到数据系统中,exporter大概如下:
type AnalyticsExporter struct {
storage storage.Storage
pump pump.Pump
filter []filter.Filters //对数据进行过滤
timeout int
OmitDetailedRecording bool //将冗余字段置为空
}
func (e *AnalyticsExporter) Export() {
//1.从storage中拉取数据
//2. 转换为对应的数据结构
//3. 过滤数据
//4. 置空冗余字段
//5. 导出数据
}