弱结构化日志 Flink SQL 怎么写?SLS SPL 来帮忙

作者:潘伟龙(豁朗)

背景

日志服务 SLS 是云原生观测与分析平台,为 Log、Metric、Trace 等数据提供大规模、低成本、实时的平台化服务,基于日志服务的便捷的数据接入能力,可以将系统日志、业务日志等接入 SLS 进行存储、分析;阿里云 Flink 是阿里云基于 Apache Flink 构建的大数据分析平台,在实时数据分析、风控检测等场景应用广泛。阿里云 Flink 原生支持阿里云日志服务 SLS 的 Connector,用户可以在阿里云 Flink 平台将 SLS 作为源表或者结果表使用。

阿里云 Flink SLS Connector 对于结构化的日志非常直接,通过配置,SLS 的日志字段可以与 Flink SQL 的 Table 字段列一一映射;然后仍有大量的业务日志并非完全的结构化,例如会将所有日志内容写入一个字段中,需要正则提前、分隔符拆分等手段才可以提取出结构化的字段,基于这个场景,本文介绍一种使用 SLS SPL 配置 SLS Connector 完成数据结构化的方案,覆盖日志清洗与格式规整场景。

弱结构化日志处理的痛点

弱结构化日志现状与结构化处理需求的矛盾

日志数据往往是多种来源,多种格式,往往没有固定的 Schema,所以在数据处理前,需要先对数据进行清洗、格式规整,然后在进行数据分析;这类数据内容格式是不固定的,可能是 JSON 字符串、CSV 格式,甚至是不规则的 Java 堆栈日志。

Flink SQL 是一种兼容 SQL 语法的实时计算模型,可以基于 SQL 对结构化数据进行分析,但同时也要求源数据模式固定:字段名称、类型、数量是固定;这也是 SQL 计算模型的基础。

日志数据的弱结构化特点与 Flink SQL 结构化分析之间有着一道鸿沟,跨越这道鸿沟需要一个中间层来进行数据清洗、规整;这个中间层的方案有多种选择可以使用,下面会对不同的方案做简单对比,并提出一种新的基于 SLS SPL 的方案来轻量化完成解决数据清洗规整的工作。

弱结构化日志数据

下面是一条日志示例,日志格式较为复杂,既有 JSON 字符串,又有字符串与 JSON 混合的场景。其中:

  • Payload 为 JSON 字符串,其中 schedule 字段的内容也是一段 JSON 结构。
  • requestURL 为一段标准的 URL Path 路径。
  • error 字段是前半部分包含 CouldNotExecuteQuery:字符串,后半部分是一段 JSON 结构。
  • tag:path 包含日志文件的路径,其中 service_a 可能是业务名称。
  • caller 中包含文件名与文件行数。
{
  "Payload": "{\"lastNotified\": 1705030483, \"serverUri\": \"http://test.alert.com/alert-api/tasks\", \"jobID\": \"44d6ce47bb4995ef0c8052a9a30ed6d8\", \"alertName\": \"alert-12345678-123456\", \"project\": \"test-sls-project\", \"projectId\": 123, \"aliuid\": \"1234567890\", \"alertDisplayName\": \"\\u6d4b\\u8bd5\\u963f\\u91cc\\u4e91\\u544a\\u8b66\", \"checkJobUri\": \"http://test.alert.com/alert-api/task_check\", \"schedule\": {\"timeZone\": \"\", \"delay\": 0, \"runImmediately\": false, \"type\": \"FixedRate\", \"interval\": \"1m\"}, \"jobRunID\": \"bf86aa5e67a6891d-61016da98c79b-5071a6b\", \"firedNotNotified\": 25161}",
  "TaskID": "bf86aa5e67a6891d-61016da98c79b-5071a6b-334f81a-5c38aaa1-9354-43ec-8369-4f41a7c23887",
  "TaskType": "ALERT",
  "__source__": "11.199.97.112",
  "__tag__:__hostname__": "iabcde12345.cloud.abc121",
  "__tag__:__path__": "/var/log/service_a.LOG",
  "caller": "executor/pool.go:64",
  "error": "CouldNotExecuteQuery : {\n    \"httpCode\": 404,\n    \"errorCode\": \"LogStoreNotExist\",\n    \"errorMessage\": \"logstore k8s-event does not exist\",\n    \"requestID\": \"65B7C10AB43D9895A8C3DB6A\"\n}",
  "requestURL": "/apis/autoscaling/v2beta1/namespaces/python-etl/horizontalpodautoscalers/cn-shenzhen-56492-1234567890123?timeout=30s",
  "ts": "2024-01-29 22:57:13"
}

结构化数据处理需求

对于这样的日志提取出更有价值的信息需要进行数据清洗,首先需要提取重要的字段,然后对这些字段进行数据分析;本篇关注重要字段的提取,分析仍然可以在 Flink 中进行。

假设提取字段具体需求如下:

  • 提取 error 中的 httpCode、errorCode、errorMessage、requestID。
  • 提取 tag:path 中的 service_a 作为 serviceName。
  • 提取 caller 中的 pool.go 作为 fileName,64 作为 fileNo。
  • 提取 Payload 中的 project;提取 Payload 下面的 schedule 中的 type 为 scheuleType。
  • 重命名 source 为 serviceIP。
  • 其余字段舍弃。

最终需要的字段列表如下,基于这样一个表格模型,我们可以便捷的使用 Flink SQL 进行数据分析。

图片

解决方案

实现这样的数据清洗,有很多种方法,这里列举几种基于 SLS 与 Flink 的方案,不同方案之间没有绝对的优劣,需要根据不同的场景选择不同的方案。

数据加工方案: 在 SLS 控制台创建目标 Logstore,通过创建数据加工任务,完成对数据的清洗。

图片

Flink 方案: 将 error 和 payload 指定为源表字段,通过 SQL 正则函数、JSON 函数对字段进行解析,解析后的字段写入临时表,然后对临时表进行分析。

图片

SPL 方案: 在 Flink SLS Connector 中配置 SPL 语句,对数据进行清洗,Flink 中源表字段定义为清洗后的数据结构。

图片

从上述三种方案的原理不难看出,在需要数据清洗的场景中,在 SLS Connector 中配置 SPL 是一种更轻量化的方案,具有轻量化、易维护、易扩展的特点。

在日志数据弱结构化的场景中,SPL 方案既避免了方案一中创建临时中间 Logstore,也避免了方案二中在 Flink 中创建临时表,在离数据源更近的位置进行数据清洗,在计算平台关注业务逻辑,职责分离更加清晰。

如何在 Flink 中使用 SPL

接下来以一段弱结构化日志为例,来介绍基于 SLS SPL 的能力来使用 Flink。为了便于演示,这里在 Flink 控制台配置 SLS 的源表,然后开启一个连续查询以观察效果。在实际使用过程中,仅需修改 SLS 源表配置,即可完成数据清洗与字段规整。

SLS 准备数据

  • 开通 SLS,在 SLS 创建 Project,Logstore,并创建具有消费 Logstore 的权限的账号 AK/SK。
  • 当前 Logstore 数据使用 SLS SDK 写入模拟数据,格式使用上述日志片段,其中包含 JSON、复杂字符串等弱结构化字段。

图片

预览 SPL 效果

在 Logstore 可以可以开启扫描模式,SLS SPL 管道式语法使用分隔符分割不同的指令,每次输入一个指令可以即时查看结果,然后增加管道数,渐进式、探索式获取最终结果。

图片

对上图中的 SPL 进行简单描述:

* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller 
 | parse-json Payload 
 | project-away Payload 
 | parse-regexp error, 'CouldNotExecuteQuery : ({[\w":\s,\-}]+)' as errorJson 
 | parse-json errorJson 
 | parse-regexp "__tag__:__path__", '\/var\/log\/([\w\_]+).LOG' as serviceName 
 | parse-regexp caller, '\w+/([\w\.]+):(\d+)' as fileName, fileNo 
 | project-rename serviceHost="__tag__:__hostname__" 
 | extend scheduleType = json_extract_scalar(schedule, '$.type') 
 | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType, project
  • 1 行:project 指令:从原始结果中保留 Payload、error、tag:path、caller 字段,舍弃其他字段,这些字段用于后续解析。
  • 2 行:parse-json 指令:将 Payload 字符串展开为 JSON,第一层字段出现在结果中,包括 lastNotified、serviceUri、jobID 等。
  • 3 行:project-away 指令:去除原始 Payload 字段。
  • 4 行:parse-regexp 指令:按照 error 字段中的内容,解析其中的部分 JSON 内容,置于 errorJson 字段。
  • 5 行:parse-json 指令:展开 errorJson 字段,得到 httpCode、errorCode、errorMessage 等字段。
  • 6 行:parse-regexp 指令:通过正则表达式解析出 tag:path 种的文件名,并命名为 serviceName。
  • 7 行:parse-regexp 指令:通过正则表达式捕获组解析出 caller 种的文件名与行数,并置于 fileName、fileNo 字段。
  • 8 行:project-rename 指令:将 tag:hostname 字段重命名为serviceHost。
  • 9 行:extend 指令:使用 json_extract_scalar 函数,提取 schedule 中的 type 字段,并命名为 scheduleType。
  • 10 行:project 指令:保留需要的字段列表,其中 project 字段来自于 Payload。

创建 SQL 作业

在阿里云 Flink 控制台创建一个空白的 SQL 的流作业草稿,点击下一步,进入作业编写。

图片

在作业草稿中输入如下创建临时表的语句:

CREATE TEMPORARY TABLE sls_input_complex (
  errorCode STRING,
  errorMessage STRING,
  fileName STRING,
  fileNo STRING,
  httpCode STRING,
  requestID STRING,
  scheduleType STRING,
  serviceHost STRING,
  project STRING,
  proctime as PROCTIME()
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-beijing-intranet.log.aliyuncs.com',
  'accessId' = '${ak}',
  'accessKey' = '${sk}',
  'starttime' = '2024-02-01 10:30:00',
  'project' ='${project}',
  'logstore' ='${logtore}',
  'query' = '* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller | parse-json Payload | project-away Payload | parse-regexp error, ''CouldNotExecuteQuery : ({[\w":\s,\-}]+)'' as errorJson | parse-json errorJson | parse-regexp "__tag__:__path__", ''\/var\/log\/([\w\_]+).LOG'' as serviceName | parse-regexp caller, ''\w+/([\w\.]+):(\d+)'' as fileName, fileNo | project-rename serviceHost="__tag__:__hostname__" | extend scheduleType = json_extract_scalar(schedule, ''$.type'') | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType,project'
  );
  • 其中 a k , {ak}, ak,{sk}, p r o j e c t , {project}, project,{logstore} 需要替换为有消费权限的 AK 账号。
  • query 字段,替换为上述 SPL,注意在阿里云 Flink 控制台需要对单引号使用单引号转义,并且消除换行符。
  • SPL 最终得到的字段列表与 TABLE 中字段对应。

连续查询及效果

在作业中输入分析语句,查看结果数据:

SELECT * FROM sls_input_complex

点击右上角调试按钮,进行调试,可以看到 TABLE 中每一列的值,对应 SPL 处理后的结果。

图片

总结

为了适应弱结构化日志数据的需求,Flink SLS Connector 进行了升级,支持直接通过 Connector配置 SPL 的方式实现 SLS 数据源的清洗下推,特别是需要正则字段提取、JSON 字段提取、CSV 字段提取场景下,相较原数据加工方案和原 Flink SLS Connector 方案更轻量级,让数据清洗的职责更加清晰,在数据源端完成数据清洗工作,也可以减少数据的网络传输流量,使得到达 Flink 的数据已经是规整好的数据,可以更加专注在 Flink 中进行业务数据分析。

同时为了便于 SPL 验证测试,SLS 扫描查询也已支持使用 SPL 进行查询,可以实时看到 SPL 管道式语法执行结果。

参考链接:

[1] 日志服务概述

https://help.aliyun.com/zh/sls/product-overview/what-is-log-service

[2] SPL 概述

https://help.aliyun.com/zh/sls/user-guide/spl-overview

[3] 阿里云 Flink Connector SLShttps://help.aliyun.com/zh/flink/developer-reference/log-service-connector[4] SLS 扫描查询

https://help.aliyun.com/zh/sls/user-guide/scan-based-query-overview

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/414102.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Spring Boot到底是如何进行自动配置的?

【1】从 spring.factories 配置文件中加载 EnableAutoConfiguration 自动配置类),获取的自动配 置类如图所示。 【2】若 EnableAutoConfiguration 等注解标有要 exclude 的自动配置类,那么再将这个自动配置类 排除掉; 【3】排除掉要 exclude …

【Azure 架构师学习笔记】-Azure Synapse -- Link for SQL 实时数据加载

本文属于【Azure 架构师学习笔记】系列。 本文属于【Azure Synapse】系列。 前言 Azure Synapse Link for SQL 可以提供从SQL Server或者Azure SQL中接近实时的数据加载。通过这个技术,使用SQL Server/Azure SQL中的新数据能够几乎实时地传送到Synapse(…

猫头虎分享:Element UI Element Plus组件的安装及使用

博主猫头虎的技术世界 🌟 欢迎来到猫头虎的博客 — 探索技术的无限可能! 专栏链接: 🔗 精选专栏: 《面试题大全》 — 面试准备的宝典!《IDEA开发秘籍》 — 提升你的IDEA技能!《100天精通鸿蒙》 …

FinalShell控制远程Linux服务器(首先得自己已购买好Linux服务器并安装了对应的系统,这里是安装的centos系统)

1、电脑上需要安装FinalShell软件 可以到分享的链接中下载软件,然后双击点击下一步安装即可 链接:https://share.weiyun.com/Y6TrdDHp 密码:gbvyg62、建立远程连接 3、输入连接信息 4、显示连接主机成功,表示远程进入 5、输入…

解决i18n国际化可读性问题,傻瓜式webpack中文支持国际化插件开发

先来看最后的效果 问题 用过国际化i18n的朋友都知道,天下苦国际化久矣,尤其是中文为母语的开发者,在面对代码中一堆的$t(abc.def)这种一点也不直观毫无可读性的代码,根本不知道自己写了啥 (如上图,你看得出…

云里物里轻薄系列电子价签,如何革新零售?

云里物里的DS轻薄系列电子价签,凭借轻巧外观和强劲性能,为零售行业提供了更便捷的商品改价方案。这不仅是对纸质价标的替代,更以其安全性和可持续发展性,实现对零售行业的效率升级,让商家们轻松迎接数字化时代的挑战&a…

可视化图文报表

Apache Echarts介绍 Apache Echarts是一款基于Javascript的数据可视化图表库&#xff0c;提供直观&#xff0c;生动&#xff0c;可交互&#xff0c;可个性化定制的数据可视化图表。 官网&#xff1a;Apache ECharts 入门案例&#xff1a; <!DOCTYPE html> <html>…

Firefox Focus,一个 “专注“ 的浏览器

近期才开始使用 Firefox Focus&#xff0c;虽然使用频率其实并不高&#xff0c;基本上只有想到了才去用&#xff0c;但每次使用的体验都很不错。 Firefox Focus 这款浏览器大约在 2015 年首次发布&#xff0c;不同于一般版本的 Firefox&#xff0c;它主打“自动删除浏览记录”…

数据结构:树/二叉树

一、树的概念 逻辑结构&#xff1a;层次结构&#xff0c;一对多 节点&#xff1a;树中的一个数据元素根节点&#xff1a;树中的第一个节点&#xff0c;没有父节点孩子节点&#xff1a;该节点的直接下级节点父(亲)节点&#xff1a;该结点的直接上级节点兄弟节点&#xff1a;有…

机器学习-02-机器学习算法分类以及在各行各业的应用

总结 本系列是机器学习课程的第02篇&#xff0c;主要介绍机器学习算法分类以及在各行各业的应用 本门课程的目标 完成一个特定行业的算法应用全过程&#xff1a; 定义问题&#xff08;Problem Definition&#xff09; -> 数据收集(Data Collection) -> 数据分割(Data…

初识Maven

介绍&#xff1a; web后端开发技术ApacheMaven是一个项目管理和构建工具&#xff0c;它基于项目对象模型&#xff08;POM&#xff09;的概念&#xff0c;通过一小段描述信息来管理项目的构建。安装&#xff1a;http://maven.apache.org/ Apache软件基金会&#xff0c;成立于19…

新能源汽车出海潮起,智能驾驶方案成差异化优势

2023年&#xff0c;中国汽车产销分别达3016.1万辆和3009.4万辆&#xff0c;巨大的规模之下是激烈的品牌竞争。由整车企业引领&#xff0c;汽车产业链的电动化智能化转型逐渐倒逼企业自行开拓成长空间。转型力度偏小、产品更新较慢的海外市场&#xff0c;成为蕴含金矿的待开掘目…

电子电器架构新趋势 —— 最佳着力点:域控制器

电子电器架构新趋势 —— 最佳着力点&#xff1a;域控制器 我是穿拖鞋的汉子&#xff0c;魔都中坚持长期主义的汽车电子工程师&#xff08;Wechat&#xff1a;gongkenan2013&#xff09;。 老规矩&#xff0c;分享一段喜欢的文字&#xff0c;避免自己成为高知识低文化的工程师…

mac电脑监控软件哪个好

在Mac电脑使用日益普及的今天&#xff0c;企业对于Mac终端的安全管理需求也日益增长。Mac电脑监控软件作为一种有效的管理工具&#xff0c;能够帮助企业提高数据安全性和员工工作效率。 在众多Mac电脑监控软件中&#xff0c;域智盾软件以其卓越的功能和性能脱颖而出&#xff0c…

【办公类-21-04】20240227单个word按“段落数”拆分多个Word(三级育婴师操作参考题目 有段落文字和表格 1拆13份)

作品展示 背景需求&#xff1a; 最近学育婴师&#xff0c;老师发了一套doc操作参考 但是老师是一节节授课的&#xff0c;每节都有视频&#xff0c;如果做在一个文档里&#xff0c;会很长很长&#xff0c;容易找不到。所以我需要里面的单独文字的docx。 以前的方法是 1、打开源…

论文阅读:SOLOv2: Dynamic, Faster and Stronger

目录 概要 Motivation 整体架构流程 技术细节 小结 论文地址&#xff1a;[2003.10152] SOLOv2: Dynamic and Fast Instance Segmentation (arxiv.org) 代码地址&#xff1a;GitHub - WXinlong/SOLO: SOLO and SOLOv2 for instance segmentation, ECCV 2020 & NeurIPS…

逆变器专题(10)-电流环控制参数设计

相应仿真原件请移步资源下载 对跟网型逆变器来说&#xff0c;电流环的PI参数设计尤其重要 如上图所示为电流环解耦控制模型 而电压、电流采样和计算都是在开关周期的中间时刻进行&#xff0c;SVPWM调制出的磁矢量需要在一个开关周期进行作用&#xff0c;因此&#xff0c;整个逆…

2024年腾讯云4核8G12M配置的轻量服务器同时支持多大访问量?

腾讯云4核8G服务器支持多少人在线访问&#xff1f;支持25人同时访问。实际上程序效率不同支持人数在线人数不同&#xff0c;公网带宽也是影响4核8G服务器并发数的一大因素&#xff0c;假设公网带宽太小&#xff0c;流量直接卡在入口&#xff0c;4核8G配置的CPU内存也会造成计算…

swagger-ui.html报错404,解决办法

swagger-ui.html报错404,解决办法&#xff01;现在后端开发项目中&#xff0c;为了节省时间&#xff0c;使用swagger插件&#xff0c;可以方便的快捷生成接口文档。但是如果你在请求前端页面路径比如&#xff1a;http://127.0.0.1:7777/swagger-ui.html。找不到。那是因为你的配…

Nginx网络服务六-----IP透传、调度算法和负载均衡

1.实现反向代理客户端 IP 透传 就是在日志里面加上一个变量 Module ngx_http_proxy_module [rootcentos8 ~]# cat /apps/nginx/conf/conf.d/pc.conf server { listen 80; server_name www.kgc.org; location / { index index.html index.php; root /data/nginx/html/p…