go语言并发实战——日志收集系统(十) 重构tailfile模块实现同时监控多个日志文件

前言

在上一篇文章中,我们实现了通过etcd来同时指定多个不同的有关分区与日志文件的路径,但是锁着一次读取配置的增多,不可避免的出现了一个问题:我们如何来监控多个日志文件,这样原来的tailFile模块相对于当下场景就显得有些捉襟见肘了,所以对tialFile模块进行重构就成了我们必须要做的事情了。

TailFiile模块的重构流程

储存数据结构体的重构

在上一篇博文中我们定义了collectEntry来储存我们从etcd中get到的信息,但是,这个获取的消息在tailFile模块也需要使用,所以这里我们再创建一个common模块来专门储存这个数据:

type CollectEntry struct {
	Path  string `json:"path"`
	Topic string `json:"topic"`
}

tailFile模块中也需要一个结构体来储存需要的信息:

type tailTask struct{
	path string
	topic string
	TailObj *tail.Tail
}

tail初始化模块的重构

由于现在我们的配置信息全部储存到了 CollectEntry结构体中,它会给tail的初始化函数传递一个CollectEntry结构体数组,所以我们需要对之前的tail模块代码进行重构与细化,如下:

type tailTask struct {
	path    string
	topic   string
	TailObj *tail.Tail
}

func NewTailTask(path, topic string) (tt *tailTask) {
	tt = &tailTask{
		path:  path,
		topic: topic,
	}
	return tt
}

func (task *tailTask) Init() (err error) {
	config := tail.Config{
		Follow:    true,
		ReOpen:    true,
		MustExist: true,
		Poll:      true,
		Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
	}
	task.TailObj, err = tail.TailFile(task.path, config)
	if err != nil {
		logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err)
		return
	}
	return
}

func InitTail(collectEntryList []common.CollectEntry) (err error) {
	for _, entry := range collectEntryList {
		tt := NewTailTask(entry.Path, entry.Topic)
		err = tt.Init()
		if err != nil {
			logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err)
			continue
		}
		go tt.run()
	}
	return
}

之前我们只有一个日志需要监控,所以主要的工作流程可以放在man.go中,但是现在会创建多个tailTask来监控,我们最好将他移动到tail模块中,最后tail模块的全部代码为:

package tailFile

import (
	"github.com/Shopify/sarama"
	"github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
	"github.com/hpcloud/tail"
	"log-agent/Kafka"
	"log-agent/common"
	"strings"
	"time"
)

type tailTask struct {
	path    string
	topic   string
	TailObj *tail.Tail
}

func NewTailTask(path, topic string) (tt *tailTask) {
	tt = &tailTask{
		path:  path,
		topic: topic,
	}
	return tt
}

func (task *tailTask) Init() (err error) {
	config := tail.Config{
		Follow:    true,
		ReOpen:    true,
		MustExist: true,
		Poll:      true,
		Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
	}
	task.TailObj, err = tail.TailFile(task.path, config)
	if err != nil {
		logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err)
		return
	}
	return
}

func InitTail(collectEntryList []common.CollectEntry) (err error) {
	for _, entry := range collectEntryList {
		tt := NewTailTask(entry.Path, entry.Topic)
		err = tt.Init()
		if err != nil {
			logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err)
			continue
		}
		go tt.run()
	}
	return
}

func (t *tailTask) run() {
	for {
		line, ok := <-t.TailObj.Lines
		if !ok {
			logrus.Warn("tailFile.TailObj.Lines channel closed,path:%s\n", t.path)
			time.Sleep(2 * time.Second)
			continue
		}
		if len(strings.Trim(line.Text, "\r")) == 0 {
			continue
		}
		msg := &sarama.ProducerMessage{}
		msg.Topic = t.topic
		msg.Value = sarama.StringEncoder(line.Text)
		Kafka.MesChan(msg)
	}
}

修改模块的全部代码

  • main.go
package main

import (
	"fmt"
	"github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
	"github.com/go-ini/ini"
	"log-agent/Kafka"
	"log-agent/etcd"
	"log-agent/tailFile"
)

type Config struct {
	Kafakaddress Kafkaddress `ini:"kafka"`
	LogFilePath  LogFilePath `ini:"collect"`
	Etcdaddress  EtcdAddress `ini:"etcd"`
}

type Kafkaddress struct {
	Addr        []string `ini:"address"`
	Topic       string   `ini:"topic"`
	MessageSize int64    `ini:"chan_size"`
}

type LogFilePath struct {
	Path string `ini:"logfile_path"`
}

type EtcdAddress struct {
	Addr []string `ini:"address"`
	Key  string   `ini:"collect_key"`
}

func run() {
	select {}
}

func main() {
	//读取配置文件,获取配置信息
	filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"
	ConfigObj := new(Config)
	err := ini.MapTo(ConfigObj, filename)
	if err != nil {
		logrus.Error("%s Load failed,err:", filename, err)
	}

	//初始化Kafka
	err = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)
	if err != nil {
		logrus.Error("InitKafka failed, err:%v", err)
		return
	}
	logrus.Infof("InitKafka success")

	//初始化etcd
	err = etcd.Init(ConfigObj.Etcdaddress.Addr)
	if err != nil {
		logrus.Error("InitEtcd failed, err:%v", err)
		return
	}
	logrus.Infof("InitEtcd success")

	//拉取要收集日志文件的配置项
	err, collectEntryList := etcd.GetConf(ConfigObj.Etcdaddress.Key)
	if err != nil {
		logrus.Error("GetConf failed, err:%v", err)
		return
	}
	fmt.Println(collectEntryList)
	//初始化tail

	err = tailFile.InitTail(collectEntryList)
	if err != nil {
		logrus.Error("InitTail failed, err:%v", err)
		return
	}
	logrus.Infof("InitTail success")
	run()
}

  • common.go
package common

type CollectEntry struct {
	Path  string `json:"path"`
	Topic string `json:"topic"`
}

  • tailFile.go
package tailFile

import (
	"github.com/Shopify/sarama"
	"github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
	"github.com/hpcloud/tail"
	"log-agent/Kafka"
	"log-agent/common"
	"strings"
	"time"
)

type tailTask struct {
	path    string
	topic   string
	TailObj *tail.Tail
}

func NewTailTask(path, topic string) (tt *tailTask) {
	tt = &tailTask{
		path:  path,
		topic: topic,
	}
	return tt
}

func (task *tailTask) Init() (err error) {
	config := tail.Config{
		Follow:    true,
		ReOpen:    true,
		MustExist: true,
		Poll:      true,
		Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
	}
	task.TailObj, err = tail.TailFile(task.path, config)
	if err != nil {
		logrus.Error("tail create tailObj for path:%s,err:%v", task.path, err)
		return
	}
	return
}

func InitTail(collectEntryList []common.CollectEntry) (err error) {
	for _, entry := range collectEntryList {
		tt := NewTailTask(entry.Path, entry.Topic)
		err = tt.Init()
		if err != nil {
			logrus.Error("tail create tailObj for path:%s,err:%v", entry.Path, err)
			continue
		}
		go tt.run()
	}
	return
}

func (t *tailTask) run() {
	for {
		line, ok := <-t.TailObj.Lines
		if !ok {
			logrus.Warn("tailFile.TailObj.Lines channel closed,path:%s\n", t.path)
			time.Sleep(2 * time.Second)
			continue
		}
		if len(strings.Trim(line.Text, "\r")) == 0 {
			continue
		}
		msg := &sarama.ProducerMessage{}
		msg.Topic = t.topic
		msg.Value = sarama.StringEncoder(line.Text)
		Kafka.MesChan(msg)
	}
}

运行结果

在这里插入图片描述
当你对不同日志文件修改都有反馈时就代表运行成功啦!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/576262.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【JavaScript】内置对象 ④ ( Math 内置对象常用方法 | 取绝对值 | 向下取整 | 向上取整 | 四舍五入取整 | 取随机数 )

文章目录 一、Math 内置对象常用方法1、计算绝对值 - Math.abs2、取整计算 - Math.floor 向下取整 / Math.ceil 向上取整 / Math.round 四舍五入3、随机数 - Math.random4、代码示例 - 猜随机数 一、Math 内置对象常用方法 1、计算绝对值 - Math.abs 向 Math.abs() 方法中 传入…

简单的jmeter脚本自动化

1、创建线程组&#xff0c;定义自定义变量&#xff0c;保存请求默认值 2、用csv编写测试用例 备注&#xff1a;如果单元格内本身就有引号&#xff0c;则格式会有点小问题&#xff0c;不能直接修改为csv 用txt打开后 有引号的需要在最外层多包一层引号&#xff0c;每个引号前…

LM1875L-TB5-T 音频功率放大器 PDF中文资料_参数_引脚图

LM1875L-TB5-T 规格信息&#xff1a; 商品类型音频功率放大器 音频功率放大器的类型- 输出类型1-Channel (Mono) 作业电压16V ~ 60V 输出功率25W x 1 4Ω 额外特性过流保护,热保护 UTC LM1875是一款单片功率放大器&#xff0c;可为消费类音频应 用提供极低失真和高品质的…

外星人电脑丢失文件怎么找回?六大方法助你重获希望

对于许多依赖电脑进行日常工作和娱乐活动的用户来说&#xff0c;电脑中存储的文件无疑是宝贵的财富。然而&#xff0c;意外总是难以避免&#xff0c;外星人电脑也不例外。文件丢失、误删、硬盘故障等问题都可能给用户带来不小的困扰。那么&#xff0c;当外星人电脑遭遇文件丢失…

南京邮电大学计算机组成与结构四次实验报告

文章目录 资源链接预览实验一&#xff1a;算术逻辑运算实验实验二&#xff1a;存储器和总线实验实验三&#xff1a;通用寄存器实验实验四&#xff1a;综合实验的调试 资源链接 资源链接 预览 实验一&#xff1a;算术逻辑运算实验 实验二&#xff1a;存储器和总线实验 实验三&…

07 流量回放实现自动化回归测试

在本模块的前四讲里&#xff0c;我向你介绍了可以直接落地的、能够支撑百万并发的读服务的系统架构&#xff0c;包含懒加载缓存、全量缓存&#xff0c;以及数据同步等方案的技术细节。 基于上述方案及细节&#xff0c;你可以直接对你所负责的读服务进行架构升级&#xff0c;将…

pytorch-激活函数与GPU加速

目录 1. sigmod和tanh2. relu3. Leaky Relu4. selu5. softplus6. GPU加速7. 使用GPU加速手写数据训练 1. sigmod和tanh sigmod梯度区间是0&#xff5e;1&#xff0c;当梯度趋近0或者1时会出现梯度弥散的问题。 tanh区间时-1&#xff5e;1&#xff0c;是sigmod经过平移和缩放而…

第 3 篇 : Netty离线消息处理(可跳过)

说明 仅是个人的不成熟想法, 未深入研究验证 1. 修改 NettyServerHandler类 package com.hahashou.netty.server.config;import com.alibaba.fastjson.JSON; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHan…

云计算时代:SFP、SFP+、SFP28、QSFP+和QSFP28光纤模块详解

随着数据中心的快速发展和云计算的广泛应用&#xff0c;高速、高效率的光纤网络传输成为关键需求。在众多光纤模块中&#xff0c;SFP、SFP、SFP28、QSFP和QSFP28是最常见的几种类型。本文将为您详细解析这几种光纤模块之间的区别&#xff0c;帮助您更好地了解和选择适合自己需求…

【产品经理修炼之道】- B端产品用户层级与需求优先级

B端的需求和C端有比较大的差异&#xff1a;C端的用户画像&#xff0c;在B端更多是以角色、权力和义务的划分。在这种情况下&#xff0c;我们的需求处理方式也会有所不同。 交互设计其实就是用户的行为设计&#xff0c;既然是围绕用户的行为&#xff0c;那么我们首先得清楚我们的…

flutter 微信输入框 (第二版)

微信的聊天输入框之前实现了一个版本&#xff08;flutter 微信聊天输入框_flutter 聊天输入框-CSDN博客&#xff09;&#xff0c; 但是之前实现的不太优雅。这两天重写了一遍。效果如下&#xff1a; 1.页面拆分 这里我们把 聊天的页面进行 拆分&#xff1a;Scaffold &#xff0…

免费预约即将截止,5月7日上海TCT亚洲3D打印展参观指南,收藏!

进入TCT亚洲展官网&#xff08;网页搜索TCT亚洲展&#xff09;&#xff0c;免费登记预约 2024年TCT亚洲展作为推动增材制造在亚洲市场的业务交流的重要平台&#xff0c;将于2024年5月7日至9日在国家会展中心&#xff08;上海&#xff09;7.1&8.1馆举办&#xff0c;与海内外…

二 SSM整合实操

SSM整合实操 一 依赖管理 数据库准备 mysql8.0.33 CREATE DATABASE mybatis-example;USE mybatis-example;CREATE TABLE t_emp(emp_id INT AUTO_INCREMENT,emp_name CHAR(100),emp_salary DOUBLE(10,5),PRIMARY KEY(emp_id) );INSERT INTO t_emp(emp_name,emp_salary) VALUE…

短视频素材有哪些?短视频素材哪一类最吸引人?

随着视频内容在全球各种媒体和平台上的普及&#xff0c;寻找能够让你的项目脱颖而出的视频素材变得尤为重要。以下视频素材网站各具特色&#xff0c;提供从自然风景到都市快照&#xff0c;从简单背景到复杂动画的多样选择。 1. 蛙学府&#xff08;中国&#xff09; 提供4K高解…

全志ARM-蜂鸣器

操作准备&#xff1a; 1.使Tab键的缩进和批量对齐为4格 在/etc/vim/vimrc 中添加一项配置 set tabstop 4; 也可以再加一行 set nu显示代码的行数 vim的设置&#xff0c;修改/etc/vim/vimrc文件&#xff0c;需要用超级用户权限 /etc/vim/vimrc set shiftwidth4 设置批量对…

VsCode一直连接不上 timed out

前言 前段时间用VsCode连接远程服务器&#xff0c;正常操作后总是连接不上&#xff0c;折磨了半个多小时&#xff0c;后面才知道原来是服务器设置的问题&#xff0c;故记录一下&#xff0c;防止后面的小伙伴也踩坑。 我使用的是阿里云服务器&#xff0c;如果是使用其他平台服务…

web(微博发布案例)

示例&#xff1a; 1、检测空白内容 2、发布内容 html: <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><meta …

vue+element之解决upload组件上传文件失败后仍显示在列表上、自动上传、过滤、findIndex、splice、filter

MENU 前言错误案例(没有用)正确方法结束语 前言 el-upload上传失败后&#xff0c;文件仍显示在列表上。 这个pdf文件上传失败&#xff0c;仍显示在列表&#xff0c;给人错觉是上传成功&#xff0c;所以要把它去掉。 在element中&#xff0c;file-list和v-model:file-list是用于…

苹果一次性开源了8个大模型! 包含模型权重、训练日志和设置,OpenELM全面开源

不以开放性著称的苹果居然同时开源了大模型的权重、训练和评估框架&#xff0c;涵盖训练日志、多个保存点和预训练设置。同时升级计算机视觉工具包 CVNets 为 CoreNet&#xff01;支持 OpenELM&#xff01; ▲图1.由Stable Diffusion3生成。 OpenELM是Apple苹果公司最新推出的…

【产品经理修炼之道】- 如何分析一个产品

新人产品经理面试的时候&#xff0c;常被问到的一个问题是&#xff1a;如何评价一款产品。这个问题&#xff0c;我们可以从五个层级一个模型来解答&#xff0c;看你能分析到哪一层。 初级产品经理面试时&#xff0c;经常会问这样的问题&#xff1a; 1&#xff09;你是最喜欢的…