go-高效处理应用程序数据

一、背景

大型的应用程序为了后期的排障、运营等,会将一些请求、日志、性能指标等数据保存到存储系统中。为了满足这些需求,我们需要进行数据采集,将数据高效的传输到存储系统

二、问题

  • 采集服务仅仅针对某个需求开发,需要修改业务代码逻辑,会给业务带来比较大的负担,并且耦合度太高
  • 数据采集导致已有的服务请求延时变高
  • 采集性能差,需要较长的时间才能采集完一批数据
  • 服务关闭时会丢失数据

三、解决方案

  • 针对问题1,我们可以将数据采集从业务服务中解耦出来,专门创建一个数据采集服务。业务程序只需要将数据传输到指定的中间件,至于数据的处理、采样、过滤、传出等逻辑都在采集服务中完成。
  • 针对问题2,将数据的导出由同步改为异步,异步开启多个协程消费通道中的数据,这样对程序的性能几乎微乎其微
  • 针对问题3,设置好采集最长间隔、批量采集大小以及使用高性能的数据中间件作为中转,比如redis、kafka
  • 针对问题4,为导出器和采集服务增加关闭监听,程序关闭时将数据清空完再退出

四、采集服务实现

4.1 架构设计

在这里插入图片描述

4.2 storage

负责从数据中间件中拿到数据,接口定义如下:

type AnalyticsStorage interface {
	Init(config interface{}) error
	GetName() string
	Connect() bool
	GetAndDeleteSet(string) []interface{}
}

redis的storage实现

import (
	"crypto/tls"
	"strconv"
	"time"

	redis "github.com/go-redis/redis/v7"
	"github.com/marmotedu/errors"
	"github.com/mitchellh/mapstructure"

	genericoptions "github.com/marmotedu/iam/internal/pkg/options"
	"github.com/marmotedu/iam/pkg/log"
)

// ------------------- REDIS CLUSTER STORAGE MANAGER -------------------------------

// RedisKeyPrefix defines prefix for iam analytics key.
const (
	RedisKeyPrefix      = "analytics-"
	defaultRedisAddress = "127.0.0.1:6379"
)

var redisClusterSingleton redis.UniversalClient

// RedisClusterStorageManager is a storage manager that uses the redis database.
type RedisClusterStorageManager struct {
	db        redis.UniversalClient
	KeyPrefix string
	HashKeys  bool
	Config    genericoptions.RedisOptions
}

// NewRedisClusterPool returns a redis cluster client.
func NewRedisClusterPool(forceReconnect bool, config genericoptions.RedisOptions) redis.UniversalClient {
	if !forceReconnect {
		if redisClusterSingleton != nil {
			log.Debug("Redis pool already INITIALIZED")

			return redisClusterSingleton
		}
	} else {
		if redisClusterSingleton != nil {
			redisClusterSingleton.Close()
		}
	}

	log.Debug("Creating new Redis connection pool")

	maxActive := 500
	if config.MaxActive > 0 {
		maxActive = config.MaxActive
	}

	timeout := 5 * time.Second

	if config.Timeout > 0 {
		timeout = time.Duration(config.Timeout) * time.Second
	}

	var tlsConfig *tls.Config
	if config.UseSSL {
		tlsConfig = &tls.Config{
			InsecureSkipVerify: config.SSLInsecureSkipVerify,
		}
	}

	var client redis.UniversalClient
	opts := &RedisOpts{
		MasterName:   config.MasterName,
		Addrs:        getRedisAddrs(config),
		DB:           config.Database,
		Password:     config.Password,
		PoolSize:     maxActive,
		IdleTimeout:  240 * time.Second,
		ReadTimeout:  timeout,
		WriteTimeout: timeout,
		DialTimeout:  timeout,
		TLSConfig:    tlsConfig,
	}

	if opts.MasterName != "" {
		log.Info("--> [REDIS] Creating sentinel-backed failover client")
		client = redis.NewFailoverClient(opts.failover())
	} else if config.EnableCluster {
		log.Info("--> [REDIS] Creating cluster client")
		client = redis.NewClusterClient(opts.cluster())
	} else {
		log.Info("--> [REDIS] Creating single-node client")
		client = redis.NewClient(opts.simple())
	}

	redisClusterSingleton = client

	return client
}

func getRedisAddrs(config genericoptions.RedisOptions) (addrs []string) {
	if len(config.Addrs) != 0 {
		addrs = config.Addrs
	}

	if len(addrs) == 0 && config.Port != 0 {
		addr := config.Host + ":" + strconv.Itoa(config.Port)
		addrs = append(addrs, addr)
	}

	return addrs
}

// RedisOpts is the overridden type of redis.UniversalOptions. simple() and cluster() functions are not public
// in redis library. Therefore, they are redefined in here to use in creation of new redis cluster logic.
// We don't want to use redis.NewUniversalClient() logic.
type RedisOpts redis.UniversalOptions

func (o *RedisOpts) cluster() *redis.ClusterOptions {
	if len(o.Addrs) == 0 {
		o.Addrs = []string{defaultRedisAddress}
	}

	return &redis.ClusterOptions{
		Addrs:     o.Addrs,
		OnConnect: o.OnConnect,

		Password: o.Password,

		MaxRedirects:   o.MaxRedirects,
		ReadOnly:       o.ReadOnly,
		RouteByLatency: o.RouteByLatency,
		RouteRandomly:  o.RouteRandomly,

		MaxRetries:      o.MaxRetries,
		MinRetryBackoff: o.MinRetryBackoff,
		MaxRetryBackoff: o.MaxRetryBackoff,

		DialTimeout:        o.DialTimeout,
		ReadTimeout:        o.ReadTimeout,
		WriteTimeout:       o.WriteTimeout,
		PoolSize:           o.PoolSize,
		MinIdleConns:       o.MinIdleConns,
		MaxConnAge:         o.MaxConnAge,
		PoolTimeout:        o.PoolTimeout,
		IdleTimeout:        o.IdleTimeout,
		IdleCheckFrequency: o.IdleCheckFrequency,

		TLSConfig: o.TLSConfig,
	}
}

func (o *RedisOpts) simple() *redis.Options {
	addr := defaultRedisAddress
	if len(o.Addrs) > 0 {
		addr = o.Addrs[0]
	}

	return &redis.Options{
		Addr:      addr,
		OnConnect: o.OnConnect,

		DB:       o.DB,
		Password: o.Password,

		MaxRetries:      o.MaxRetries,
		MinRetryBackoff: o.MinRetryBackoff,
		MaxRetryBackoff: o.MaxRetryBackoff,

		DialTimeout:  o.DialTimeout,
		ReadTimeout:  o.ReadTimeout,
		WriteTimeout: o.WriteTimeout,

		PoolSize:           o.PoolSize,
		MinIdleConns:       o.MinIdleConns,
		MaxConnAge:         o.MaxConnAge,
		PoolTimeout:        o.PoolTimeout,
		IdleTimeout:        o.IdleTimeout,
		IdleCheckFrequency: o.IdleCheckFrequency,

		TLSConfig: o.TLSConfig,
	}
}

func (o *RedisOpts) failover() *redis.FailoverOptions {
	if len(o.Addrs) == 0 {
		o.Addrs = []string{"127.0.0.1:26379"}
	}

	return &redis.FailoverOptions{
		SentinelAddrs: o.Addrs,
		MasterName:    o.MasterName,
		OnConnect:     o.OnConnect,

		DB:       o.DB,
		Password: o.Password,

		MaxRetries:      o.MaxRetries,
		MinRetryBackoff: o.MinRetryBackoff,
		MaxRetryBackoff: o.MaxRetryBackoff,

		DialTimeout:  o.DialTimeout,
		ReadTimeout:  o.ReadTimeout,
		WriteTimeout: o.WriteTimeout,

		PoolSize:           o.PoolSize,
		MinIdleConns:       o.MinIdleConns,
		MaxConnAge:         o.MaxConnAge,
		PoolTimeout:        o.PoolTimeout,
		IdleTimeout:        o.IdleTimeout,
		IdleCheckFrequency: o.IdleCheckFrequency,

		TLSConfig: o.TLSConfig,
	}
}

// GetName returns the redis cluster storage manager name.
func (r *RedisClusterStorageManager) GetName() string {
	return "redis"
}

// Init initialize the redis cluster storage manager.
func (r *RedisClusterStorageManager) Init(config interface{}) error {
	r.Config = genericoptions.RedisOptions{}
	err := mapstructure.Decode(config, &r.Config)
	if err != nil {
		log.Fatalf("Failed to decode configuration: %s", err.Error())
	}

	r.KeyPrefix = RedisKeyPrefix

	return nil
}

// Connect will establish a connection to the r.db.
func (r *RedisClusterStorageManager) Connect() bool {
	if r.db == nil {
		log.Debug("Connecting to redis cluster")
		r.db = NewRedisClusterPool(false, r.Config)

		return true
	}

	log.Debug("Storage Engine already initialized...")

	// Reset it just in case
	r.db = redisClusterSingleton

	return true
}

func (r *RedisClusterStorageManager) hashKey(in string) string {
	return in
}

func (r *RedisClusterStorageManager) fixKey(keyName string) string {
	setKeyName := r.KeyPrefix + r.hashKey(keyName)

	log.Debugf("Input key was: %s", setKeyName)

	return setKeyName
}

// GetAndDeleteSet get and delete key from redis.
func (r *RedisClusterStorageManager) GetAndDeleteSet(keyName string) []interface{} {
	log.Debugf("Getting raw key set: %s", keyName)

	if r.db == nil {
		log.Warn("Connection dropped, connecting..")
		r.Connect()

		return r.GetAndDeleteSet(keyName)
	}

	log.Debugf("keyName is: %s", keyName)

	fixedKey := r.fixKey(keyName)

	log.Debugf("Fixed keyname is: %s", fixedKey)

	var lrange *redis.StringSliceCmd
	_, err := r.db.TxPipelined(func(pipe redis.Pipeliner) error {
		lrange = pipe.LRange(fixedKey, 0, -1)
		pipe.Del(fixedKey)

		return nil
	})
	if err != nil {
		log.Errorf("Multi command failed: %s", err)
		r.Connect()
	}

	vals := lrange.Val()

	result := make([]interface{}, len(vals))
	for i, v := range vals {
		result[i] = v
	}

	log.Debugf("Unpacked vals: %d", len(result))

	return result
}

// SetKey will create (or update) a key value in the store.
func (r *RedisClusterStorageManager) SetKey(keyName, session string, timeout int64) error {
	log.Debugf("[STORE] SET Raw key is: %s", keyName)
	log.Debugf("[STORE] Setting key: %s", r.fixKey(keyName))

	r.ensureConnection()
	err := r.db.Set(r.fixKey(keyName), session, 0).Err()
	if timeout > 0 {
		if expErr := r.SetExp(keyName, timeout); expErr != nil {
			return expErr
		}
	}
	if err != nil {
		log.Errorf("Error trying to set value: %s", err.Error())

		return errors.Wrap(err, "failed to set key")
	}

	return nil
}

// SetExp is used to set the expiry of a key.
func (r *RedisClusterStorageManager) SetExp(keyName string, timeout int64) error {
	err := r.db.Expire(r.fixKey(keyName), time.Duration(timeout)*time.Second).Err()
	if err != nil {
		log.Errorf("Could not EXPIRE key: %s", err.Error())
	}

	return errors.Wrap(err, "failed to set expire time for key")
}

func (r *RedisClusterStorageManager) ensureConnection() {
	if r.db != nil {
		// already connected
		return
	}
	log.Info("Connection dropped, reconnecting...")
	for {
		r.Connect()
		if r.db != nil {
			// reconnection worked
			return
		}
		log.Info("Reconnecting again...")
	}
}

4.3 pump

我们首先会针对某种业务创建对应的数据结构,比如

type AnalyticsRecord struct {
	TimeStamp  int64     `json:"timestamp"`
	Username   string    `json:"username"`
	Effect     string    `json:"effect"`
	Conclusion string    `json:"conclusion"`
	Request    string    `json:"request"`
	Policies   string    `json:"policies"`
	Deciders   string    `json:"deciders"`
	ExpireAt   time.Time `json:"expireAt"   bson:"expireAt"`
}

pump负责将数据导出到指定的数据存储系统,比如promethus、mongo、ES等,它的接口定义如下:

type Pump interface {
	GetName() string
	New() Pump
	Init(interface{}) error
	WriteData(context.Context, []interface{}) error
	SetTimeout(timeout int)
	GetTimeout() int
}

4.4 exporter

exporter依赖storage和pump,每个exporter负责一种业务,一般对应一种数据结构

type AnalyticsRecord struct {
	TimeStamp  int64     `json:"timestamp"`
	Username   string    `json:"username"`
	Effect     string    `json:"effect"`
	Conclusion string    `json:"conclusion"`
	Request    string    `json:"request"`
	Policies   string    `json:"policies"`
	Deciders   string    `json:"deciders"`
	ExpireAt   time.Time `json:"expireAt"   bson:"expireAt"`
}

exporter从storage中取出数据转成对应的数据结构,并进行过滤和去掉冗余字段内容。最后将数据通过pump导出到数据系统中,exporter大概如下:

type AnalyticsExporter struct {
	storage storage.Storage
	pump    pump.Pump
	filter      []filter.Filters  //对数据进行过滤
	timeout               int
	OmitDetailedRecording bool  //将冗余字段置为空
}

func (e *AnalyticsExporter) Export() {
	//1.从storage中拉取数据
	//2. 转换为对应的数据结构
	//3. 过滤数据
	//4. 置空冗余字段
	//5. 导出数据
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/797744.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

树莓派pico入坑笔记,esp01/01s使用

目录 关于树莓派pico和circuitpython的更多玩法,请看树莓派pico专栏 说明 关于at指令 WiFi的at指令 UDP的at指令 样例程序 调试助手端输入指令 sta端程序 效果 进阶使用 库函数说明 样例代码 关于树莓派pico和circuitpython的更多玩法,请看树…

TensorFlow系列:第四讲:MobileNetV2实战

一. 加载数据集 编写工具类,实现数据集的加载 import keras""" 加载数据集工具类 """class DatasetLoader:def __init__(self, path_url, image_size(224, 224), batch_size32, class_modecategorical):self.path_url path_urlself…

H5的Canvas如何画N叉树数据结构

大家好。我是猿码叔叔,一位有着 5 年Java工作经验的北漂,业余时间喜欢瞎捣鼓,学习一些新东西来丰富自己。看过上一篇 Java 方法调用关系的老铁们,也许遗留了不少疑问,这Java方法调用关系可视化页面就这?这方…

护网HW面试——redis利用方式即复现

参考:https://xz.aliyun.com/t/13071 面试中经常会问到ssrf的打法,讲到ssrf那么就会讲到配合打内网的redis,本篇就介绍redis的打法。 未授权 原理: Redis默认情况下,会绑定在0.0.0.0:6379,如果没有采用相关…

基于SpringBoot的校园志愿者管理系统

你好呀,我是计算机学姐码农小野!如果有相关需求,可以私信联系我。 开发语言:Java 数据库:MySQL 技术:SpringBoot框架 工具:MyEclipse、Tomcat 系统展示 首页 个人中心 志愿者管理 活动信息…

黑马头条微服务学习day01-环境搭建、SpringCloud微服务(注册发现、网关)

文章目录 项目介绍环境搭建项目背景业务功能技术栈说明 nacos服务器环境准备nacos安装 初始工程搭建环境准备主体结构 app登录需求分析表结构分析手动加密微服务搭建接口定义功能实现登录功能实现 Swagger使用app端网关nginx配置 项目介绍 环境搭建 项目背景 业务功能 技术栈说…

数据结构(Java):树二叉树

目录 1、树型结构 1.1 树的概念 1.2 如何判断树与非树 1.3 树的相关概念 1.4 树的表示形式 1.4.1 孩子兄弟表示法 2、二叉树 2.1 二叉树的概念 2.2 特殊的二叉树 2.3 二叉树的性质 2.4 二叉树的存储 2.5 二叉树的遍历 1、树型结构 1.1 树的概念 树型结构是一种非线…

MySQL复合查询(重点)

前面我们讲解的mysql表的查询都是对一张表进行查询,在实际开发中这远远不够。 基本查询回顾 查询工资高于500或岗位为MANAGER的雇员,同时还要满足他们的姓名首字母为大写的J mysql> select * from emp where (sal>500 or jobMANAGER) and ename l…

数据湖仓一体(一) 编译hudi

目录 一、大数据组件版本信息 二、数据湖仓架构 三、数据湖仓组件部署规划 四、编译hudi 一、大数据组件版本信息 hudi-0.14.1zookeeper-3.5.7seatunnel-2.3.4kafka_2.12-3.5.2hadoop-3.3.5mysql-5.7.28apache-hive-3.1.3spark-3.3.1flink-1.17.2apache-dolphinscheduler-3.1.9…

[Vulnhub] Sedna BuilderEngine-CMS+Kernel权限提升

信息收集 IP AddressOpening Ports192.168.8.104TCP:22, 53, 80, 110, 111, 139, 143, 445, 993, 995, 8080, 55679 $ nmap -p- 192.168.8.104 --min-rate 1000 -sC -sV PORT STATE SERVICE VERSION 22/tcp open ssh OpenSSH 6.6.1p1 Ubuntu 2ubuntu2 …

C++20中的consteval说明符

在C20中,立即函数(immediate function)是指每次调用该函数都会直接或间接产生编译时常量表达式(constant expression)的函数。这些函数在其返回类型前使用consteval关键字进行声明。 立即函数是constexpr函数,具体情况取决于其要求。与constexpr相同&…

半小时获得一张ESG入门证书【详细中英文笔记一】

前些日子,有朋友转发了一则小红书的笔记给我, 标题是《半小时获CFI官方高颜值免费证书 ESG认证》。这对考证狂魔的我来说,必然不能错过啊,有免费的羊毛不薅白不薅。 ESG课程的 CFI 官方网址戳这里:CFI 于是信心满满的…

清华大学孙富春教授团队开发了多模态数字孪生环境,辅助机器人获得复杂的 3C 装配技能

中国是全球3C产品(电脑、通信和消费电子)的主要生产国,全球70%的3C产品产能集中在中国。3C智能制造装备的突破与产业化,对于提升我国制造产业的全球竞争力意义重大。 机器人在计算机、通信和消费电子 (3C) …

常用的设计模式和使用案例汇总

常用的设计模式和使用案例汇总 【一】常用的设计模式介绍【1】设计模式分类【2】软件设计七大原则(OOP原则) 【二】单例模式【1】介绍【2】饿汉式单例【3】懒汉式单例【4】静态内部类单例【5】枚举(懒汉式) 【三】工厂方法模式【1】简单工厂模式&#xf…

springboot 程序运行一段时间后收不到redis订阅的消息

springboot 程序运行一段时间后收不到redis订阅的消息 问题描述 程序启动后redis.user.two主题正常是可以收到消息的,发一条收一条,但是隔一段时间后;就收不到消息了; 此时如果你手动调用发送另外一个消息订阅redis.user.two2&…

vmware workstation 虚拟机安装

vmware workstation 虚拟机安装 VMware Workstation Pro是VMware(威睿公司)发布的一代虚拟机软件,中文名称一般称 为"VMware 工作站".它的主要功能是可以给用户在单一的桌面上同时运行不同的操作系统,它也是可进 行开发…

c# 容器变换

List<Tuple<int, double, bool>> 变为List<Tuple<int, bool>>集合 如果您有一个List<Tuple<int, double, bool>>并且您想要将其转换为一个List<Tuple<int, bool>>集合&#xff0c;忽略double值&#xff0c;您可以使用LINQ的S…

3U 与 SV630A 伺服实现 CANLINK 通讯

1、打开 AUTOSHOP&#xff0c;点击工具>系统选项&#xff0c;勾选自动生成 canlink 轴 控通讯配置和 canlink 轴控指令增强功能。 2、检查 plc 的拨码是否已经拨上去。 1 代表 485 通讯&#xff0c;2 代表 can 通讯&#xff0c;将 2 打到 ON 状态。还有9&#xff0c;10拨…

Matlab 计算一个平面与一条直线的交点

文章目录 一、简介二、实现代码三、实现效果参考资料一、简介 这里使用一种很有趣的坐标:Plucker线坐标,它的定义如下所示: 这个坐标有个很有趣的性质,将直线 L L L与由其齐次坐标 V = (

IDEA社区版使用Maven archetype 创建Spring boot 项目

1.新建new project 2.选择Maven Archetype 3.命名name 4.选择存储地址 5.选择jdk版本 6.Archetype使用webapp 7.create创建项目 创建好长这样。 检查一下自己的Maven是否是自己的。 没问题的话就开始增添java包。 [有的人连resources包也没有&#xff0c;那就需要自己添…