Kafka 业务日志采集最佳实践

简介

Apache Kafka 是一个分布式流处理平台,主要用于构建实时数据流管道和应用程序。在收集业务日志的场景中,Kafka 可以作为一个消息中间件,用于接收、存储和转发大量的日志数据。将 Kafka 与其他系统(如 Elasticsearch、Flume、Spark Streaming 等)集成,以提供更丰富的日志处理和分析功能。本文提到的是和观测云集成,即通过观测云的采集器 Datakit 采集 Kafka 中的业务日志,下面通过一些例子了解下观测云的快速集成效果。

实践环境

前置条件

  • 注册观测云

软件和中间件

  • Kafka3.2.0
  • Datakit采集器
  • JDK 8

硬件

  • 云服务器 CentOS7.9 64位 4vCPU,8GB 内存,100GB 云盘一台。

接入方案

准备 Kafka 环境

安装 Kafka

下载 3.2.0 版本,解压即可使用。

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.13-3.2.0.tgz

注:目前 Datakit 支持的 Kafka 版本有[version:0.8.2 ~ 3.2.0]

启动 Zookeeper 服务
$ bin/zookeeper-server-start.sh config/zookeeper.properties
启动 KafkaServer
$ bin/kafka-server-start.sh config/server.properties
创建 Topic

创建名为 testlog 的 Topic 。

$ bin/kafka-topics.sh --create --topic testlog --bootstrap-server localhost:9092
启动 Producer
$ bin/kafka-console-producer.sh --topic testlog --bootstrap-server localhost:9092

安装 DataKit

参考官网文档安装 DataKit 采集器

TOKEN 依据你的观测云工作空间来填写
DK_DATAWAY=https://openway.guance.com?token=<TOKEN> bash -c "$(curl -L https://static.guance.com/datakit/install.sh)"

开启 Kafka 采集器

进入 DataKit 安装目录下 (默认是 /usr/local/datakit/conf.d/ ) 的 conf.d/kafkamq 目录,复制 kafkamq.conf.sample 并命名为 kafkamq.conf 。

类似如下:

-rwxr-xr-x 1 root root 2574 Apr 30 23:52 kafkamq.conf
-rwxr-xr-x 1 root root 2579 May  1 00:40 kafkamq.conf.sample

调制 kafka 采集器配置如下:

  • addrs = ["localhost:9092"],该文采集器 DataKit 和 Kafka 安装到同一台操作系统中,localhost 即可。
  • kafka_version = "3.2.0",该文使用 Kafka 的版本。
  • [inputs.kafkamq.custom],删除注释符号“#”。
  • [inputs.kafkamq.custom.log_topic_map],删除注释符号“#”。
  • "testlog"="log.p",testlog 为 Topic 的名字,log.p 为观测云 Pipeline 可编程数据处理器的日志字段提取规则配置。涉及的业务日志和 log.p 的内容详细见下面的《使用 Pipeline》。
# {"version": "1.28.1", "desc": "do NOT edit this line"}

[[inputs.kafkamq]]
  addrs = ["localhost:9092"]
  # your kafka version:0.8.2 ~ 3.2.0
  kafka_version = "3.2.0"
  group_id = "datakit-group"
  # consumer group partition assignment strategy (range, roundrobin, sticky)
  assignor = "roundrobin"

  ## rate limit.
  #limit_sec = 100
  ## sample
  # sampling_rate = 1.0

  ## kafka tls config
  # tls_enable = true
  # tls_security_protocol = "SASL_PLAINTEXT"
  # tls_sasl_mechanism = "PLAIN"
  # tls_sasl_plain_username = "user"
  # tls_sasl_plain_password = "pw"

  ## -1:Offset Newest, -2:Offset Oldest
  offsets=-1

  ## skywalking custom
  #[inputs.kafkamq.skywalking]
    ## Required!send to datakit skywalking input.
    #dk_endpoint="http://localhost:9529"
    #thread = 8 
    #topics = [
    #  "skywalking-metrics",
    #  "skywalking-profilings",
    #  "skywalking-segments",
    #  "skywalking-managements",
    #  "skywalking-meters",
    #  "skywalking-logging",
    #]
    #namespace = ""

  ## Jaeger from kafka. Please make sure your Datakit Jaeger collector is open !!!
  #[inputs.kafkamq.jaeger]
    ## Required! ipv6 is "[::1]:9529"
    #dk_endpoint="http://localhost:9529"
    #thread = 8 
    #source: agent,otel,others...
    #source = "agent"
    ## Required! topics
    #topics=["jaeger-spans","jaeger-my-spans"]

  ## user custom message with PL script.
  [inputs.kafkamq.custom]
    #spilt_json_body = true
    #thread = 8 
    ## spilt_topic_map determines whether to enable log splitting for specific topic based on the values in the spilt_topic_map[topic].
    #[inputs.kafkamq.custom.spilt_topic_map]
    #  "log_topic"=true
    #  "log01"=false
    [inputs.kafkamq.custom.log_topic_map]
      "testlog"="log.p"
    #  "log01"="log_01.p"
    #[inputs.kafkamq.custom.metric_topic_map]
    #  "metric_topic"="metric.p"
    #  "metric01"="rum_apm.p"
    #[inputs.kafkamq.custom.rum_topic_map]
    #  "rum_topic"="rum_01.p"
    #  "rum_02"="rum_02.p"

  #[inputs.kafkamq.remote_handle]
    ## Required!
    #endpoint="http://localhost:8080"
    ## Required! topics
    #topics=["spans","my-spans"]
    # send_message_count = 100
    # debug = false
    # is_response_point = true
    # header_check = false
  
  ## Receive and consume OTEL data from kafka.
  #[inputs.kafkamq.otel]
    #dk_endpoint="http://localhost:9529"
    #trace_api="/otel/v1/trace"
    #metric_api="/otel/v1/metric"
    #trace_topics=["trace1","trace2"]
    #metric_topics=["otel-metric","otel-metric1"]
    #thread = 8 

  ## todo: add other input-mq

注意:开启或调整 DataKit 的配置,需重启采集器(shell 下执行 datakit service -R)。

使用 Pipeline

log.p 规则内容

data = load_json(message)
protocol = data["protocol"]
response_code = data["response_code"]
set_tag(protocol,protocol)
set_tag(response_code,response_code)
group_between(response_code,[200,300],"info","status")
group_between(response_code,[400,499],"warning","status")
group_between(response_code,[500,599],"error","status")

time = data["start_time"]
set_tag(time,time)
default_time(time)

效果展示

发送业务日志样例

业务日志样例文件如下:

#info
{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":204,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-05-01T00:37:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
#error
{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":504,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-05-01T00:39:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
#warn
{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":404,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-05-01T00:38:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}

日志发送命令

在 Producer 启动后,分别发送如下三条日志内容,三条日志一条为 info 级别("response_code":204),另一条为 error 级别("response_code":504),最后一条为 warn 级别日志("response_code":404)。

>{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":204,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-04-30T08:47:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
>
>
>
>
>{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":504,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-04-30T08:47:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
>
>
>
>{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":404,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-04-30T08:47:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}

通过 DataKit 采集到 Kafka 的三条业务日志

使用 Pipeline 对业务日志进行字段提取

下图 protocol、response_code 以及 time 都是使用 Pipeline 提取后的效果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/617030.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

AtCoder Beginner Contest 353 A~E(F,G更新中...)

A.Buildings 题意 给出若干个建筑&#xff0c;每个建筑有一个高度&#xff0c;问&#xff0c;从第二个建筑开始&#xff0c;比第一个建筑高的建筑中编号最小的是多少&#xff1f;如果不存在&#xff0c;输出-1. 分析 边输入边比较即可&#xff0c;如果循环结束还未找到&…

【muzzik 分享】Cocos 物理帧同步

# 前言 之前没研究帧同步&#xff0c;这是我前端时间没上班时边玩边搞做的 Demo 研究成果&#xff0c;总共时间一周&#xff08;实际2-3天&#xff09;&#xff0c;发布的目的也很简单&#xff0c;打破技术垄断&#xff0c;才能诞生更高端的技术成果。而且就算我没发这篇帖子&…

重生奇迹mu战士攻略有哪些

1、生命之光&#xff1a;PK前起手式&#xff0c;增加血上限。 2、雷霆裂闪&#xff1a;眩晕住对手&#xff0c;战士PK战士第一技能&#xff0c;雷霆裂闪是否使用好关系到胜负。 3、霹雳回旋斩&#xff1a;雷霆裂闪后可以选择用霹雳回旋斩跑出一定范围(因为对手下一招没出意外…

Mybatis技术内幕-基础支撑层

整体架构 MyBatis 的整体架构分为三层&#xff0c; 分别是基础支持层、核心处理层和接口层。 基础支持层 基础支持层包含整个MyBatis 的基础模块&#xff0c;这些模块为核心处理层的功能提供了良好的支撑。 解析器模块 XPathParser MyBatis提供的XPathParser 类封装了XPat…

SpringCloud使用Nacos作为配置中心实现动态数据源切换

一、Nacos-Server 了解Nacos可以直接阅读官方文档 使用Nacos&#xff0c;我们需要有Nacos-Server&#xff0c;此处就不使用官方提供的release版本了&#xff0c;而是自己编译&#xff0c;因为本来就是Java开发的&#xff0c;所以对于Javaer来说也没啥难度&#xff01; git c…

记nrm管理仓库以及发布npm包

前言 记一次在公司创建私有库以及发布npm包&#xff0c;留下个脚印 一、nrm是什么&#xff1f; nrm是 npm 镜像源管理工具&#xff0c;用于快速地在不同的 npm 源之间切换。 二、使用步骤 1.全局安装nrm 代码如下&#xff08;示例&#xff09;&#xff1a; npm install -…

Git系列:git diff使用技巧

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…

【智能算法】鹭鹰优化算法(SBOA)原理及实现

目录 1.背景2.算法原理2.1算法思想2.2算法过程 3.结果展示4.参考文献5.代码获取 1.背景 2024年&#xff0c;Y Fu受到自然界中鹭鹰生存行为启发&#xff0c;提出了鹭鹰优化算法&#xff08;Secretary Bird Optimization Algorithm, SBOA&#xff09;。 2.算法原理 2.1算法思想…

Elasticsearch查看集群信息,设置ES密码,Kibana部署

Elasticsearch查看集群信息&#xff0c;设置ES密码&#xff0c;Kibana部署 查看集群信息查看节点信息查看集群健康状态查看分片信息查看其他集群信息 Kibana部署安装设置ES密码 查看集群信息 查看节点信息 curl http://127.0.0.1:9200/_cat/nodes?v 参数说明&#xff1a; ip…

OpenGL导入的纹理图片错位

在OpenGL中导入图片的纹理照片的函数为 glTexImage2D(GL_TEXTURE_2D, 0, GL_RGB, p_w, p_h, 0, GL_BGR, GL_UNSIGNED_BYTE, pic_data);其中p_w, p_h为图片的宽和高&#xff0c;pic_data为指向图片存储空间的的地址(unsigned char *类型) 在OpenGL中图片默认是4字节对齐的&…

Web开发小知识点(二)

1.关于取余 我在Dart语言里&#xff08;flutter项目&#xff09; int checkNum (10 - 29) % 10; 那么checkNum等于1 但是在Vue项目里 const checkNum (10 - 29) % 10;却等于-9 语言的特性不同&#xff0c;导致结果也不同&#xff0c;如果要想和Dart保持一致&#xff0c;…

vue3.0(六) toRef,toValue,toRefs和toRow,markRaw

文章目录 toReftoValuetoRefstoRowmarkRawtoRef和toRefs的区别toRaw 和markRaw的用处 toRef toRef 函数可以将一个响应式对象的属性转换为一个独立的 ref 对象。返回的是一个指向源对象属性的 ref 引用&#xff0c;任何对该引用的修改都会同步到源对象属性上。使用 toRef 时需…

[蓝桥杯 2021 国 ABC] 123(java)——前缀和,思维

目录 题目 解析 代码 这么久了&#xff0c;我终于能不看别人代码完整写出来了&#xff0c;呜呜呜。虽然过程也是很曲折。 题目 解析 这个题&#xff0c;找其中数列的规律&#xff0c;1,1,2,1,2,3,1,2,3,4&#xff0c;...&#xff0c;因此我们把拆分成行列&#xff0c;如下…

Java并发处理

Java并发处理 问题描述:项目中业务编号出现重复编号 生成编号规则&#xff1a;获取数据库表最大值&#xff0c;然后再做1处理&#xff0c;即为新编号&#xff08;因为起始值是不固定的&#xff0c;还存在‘字符数据’格式&#xff0c;做了字典项可配置&#xff0c;所以不能直…

Ai 一键美术绘画文章,蓝海项目,流量巨大,盈利成效显著

今天我要向大家介绍一个全新的蓝海项目&#xff0c;那就是AI一键美术绘画文章。这个项目打破了传统的思维模式&#xff0c;更加吸引人的眼球&#xff0c;已经在各大网站上引发了大量的关注&#xff0c;轻松在抖音上热门也变得简单易行且稳定。 下载 地 址 &#xff1a; laoa…

PopClip for Mac 激活版:让文本处理更高效

还在为繁琐的文本处理而烦恼吗&#xff1f;PopClip for Mac来帮您解决&#xff01;这款神器般的文本处理工具&#xff0c;能让您轻松应对各种文本处理任务。无论是写作、编程还是日常办公&#xff0c;PopClip for Mac都能助您一臂之力&#xff0c;让您的文本处理更高效、更便捷…

【基础绘图】 09.小提琴图

效果图&#xff1a; 主要步骤&#xff1a; 1. 数据准备&#xff1a;生成随机数组 2. 数据处理&#xff1a;计算四分位数、中位数、均值、最大最小值 3. 图像绘制&#xff1a;绘制小提琴图 详细代码&#xff1a;着急的直接拖到最后有完整代码 步骤一&#xff1a;导入库包及…

动态规划----股票买卖问题(详解)

目录 一.买卖股票的最佳时机&#xff1a; 二.买卖股票的最佳时机含冷冻期&#xff1a; 三.买卖股票的最佳时期含⼿续费&#xff1a; 四.买卖股票的最佳时机III: 五.买卖股票的最佳时机IV: 买卖股票的最佳时机问题介绍&#xff1a;动态规划买卖股票的最佳时机是一个经典的…

排序2——冒泡排序,快速排序(3种递归方式+3种非递归方式)

目录 1.交换排序 2.冒泡排序 2.1基本思路 1.1.2复杂度分析 3.快速排序 3.1基本思想 3.2Hoare版本&#xff08;最初的&#xff09; 3.2.1缺点 3.2.2优化 第一种——随机选key 第二种——三数取中 第三种——小区间优化 3.3挖坑版本&#xff08;更好理解&#xff09…

【谷粒商城】01-环境准备

1.下载和安装VirtualBox 地址&#xff1a;https://www.virtualbox.org/wiki/Downloads 傻瓜式安装VirtualBox 2.下载和安装Vagrant官方镜像 地址&#xff1a;https://app.vagrantup.com/boxes/search 傻瓜式安装 验证是否安装成功 打开CMD,输入vagrant命令&#xff0c;是否…