Flink SQL 基于Update流出现空值无法过滤问题

问题背景

  • 问题描述
基于Flink-CDC ,Flink SQL的实时计算作业在运行一段时间后,突然发现插入数据库的计算结果发生部分主键属性发生失败,导致后续计算结果无法插入,
超过失败次数失败的情况
  • 问题报错
	Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO dm_hljy.dws_table_name (op_date, school_year, campus_name, school_name, depart_name, total_opfare, ids, update_time) VALUES ('2024-03-11 00:00:00+08', '2023', 'xxxx', 'xxxx学校', 'xxxx小学部', '203333300000', '57', '2024-03-21 09:31:08.47+08') ON DUPLICATE KEY UPDATE school_year=VALUES(school_year), total_opfare=VALUES(total_opfare), ids=VALUES(ids), update_time=VALUES(update_time) was aborted: ERROR: dn_6007_6008: null value in column "depart_name" violates not-null constraint  Call getNextException to see other errors in the batch.
		at com.huawei.gauss200.jdbc.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:171) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
		at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.executeBatch(QueryExecutorImpl.java:586) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
		at com.huawei.gauss200.jdbc.jdbc.PgStatement.executeBatch(PgStatement.java:883) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
		at com.huawei.gauss200.jdbc.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1580) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
		at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
		at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
		at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
		at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:266) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
		at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:236) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
		at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:159) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
		at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]
		at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_332]
		at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_332]
		at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_332]
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_332]
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_332]
		... 1 more
	Caused by: com.huawei.gauss200.jdbc.util.PSQLException: ERROR: dn_6007_6008: null value in column "depart_name" violates not-null constraint
		at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2856) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
		at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2587) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
		at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.executeBatch(QueryExecutorImpl.java:575) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
		at com.huawei.gauss200.jdbc.jdbc.PgStatement.executeBatch(PgStatement.java:883) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
		at com.huawei.gauss200.jdbc.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1580) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
		at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
		at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
		at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
		at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:266) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
		at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:236) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
		at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:159) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
		at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_332]
		at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) ~[?:1.8.0_332]
		at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) ~[?:1.8.0_332]
		at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) ~[?:1.8.0_332]
		at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_332]
		at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_332

在这里插入图片描述

定位

定位思路

1.方向一:怀疑数据库插入存在数据处理时,造成数据处理出现空值的情况,即数据本身不为空,但是数据插入却出现了空
2.方向二:Flink-SQL在消费kafka数据时存在了空值,故加工的数据计算结果存在空值

定位过程

  • 因插入数据库定位比较麻烦,且数据库已经设置该字段为主属性,故出现插入时处理为空值的概率较小。故先从较为简单的Flink SQL查询数据
  • 定位方法一,查询该字段为空的记录,待作业执行完成后,未查询到空值对应记录
 select  select * from table_name where depart_name is null or depart_name = '' or char_length(depart_name) = 0;
  • 因考虑到使用Flink-CDC进行变更数据捕获,故对应的update流存在-U,+U,-D,+I记录,因此随着插入记录存在空值被记录进去的情况,故采用view的方式,先将宽表的加工、关联方式创建为view,然后进行空值的过滤。实施如下
create view view_prd as 
select a.* ,b.*  from a join b on a.id = b.id

    select * from view_prd where depart_name is null or depart_name = '' or char_length(depart_name) = 0;
  • 通过查询结果,发现存在最后一条记录存在空值的原因,往源头定位,发现该字段之前为空,后面进行更新填充到值出现-U记录,导致数据插入持续失败
    在这里插入图片描述

原因

  • 因为flink-SQL消费的数据时kafka topic,flink以upsert-kafka形式的connector进行写入,故存在changelog 流中数据更新存在-U,+U的记录(按照Key进行区分唯一条记录),value 为空(-U)的记录kafka也,导致出现空值,
    在这里插入图片描述

解决

通过在DWS宽表创建一层View(如上),在写入DWS宽表的kafka topic之前,现将该字段空值过滤,即可排除空值涉及记录被纳入结果指标计算的范围中

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/493460.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MySQL面试汇总(一)

MySQL 如何定位慢查询 如何优化慢查询 索引及其底层实现 索引是一个数据结构,可以帮助MySQL高效获取数据。 聚簇索引和非聚簇索引 覆盖索引 索引创建原则 联合索引

前端学习<二>CSS基础——04-CSS选择器:伪类

伪类(伪类选择器) 伪类:同一个标签,根据其不同的种状态,有不同的样式。这就叫做“伪类”。伪类用冒号来表示。 比如div是属于box类,这一点很明确,就是属于box类。但是a属于什么类?…

前端js计算日期 实现倒计时效果

<!DOCTYPE html> <html lang"en"> <head> <meta charset"UTF-8"> <title>倒计时示例</title> <style> #countdown { font-size: 24px; } #countdown span { margin-right: 10p…

未来制造:机器人行业新质生产力提升策略

机器人行业新质生产力提升咨询方案 一、机器人行业目前发展现状及特点&#xff1a; 创新活跃、应用广泛、成长性强。 二、机器人企业发展新质生产力面临的痛点&#xff1a; 1、高端人才匮乏 2、核心技术受限 3、竞争日益国际化 4、成本控制挑战 5、用户体验提升需求 三…

Linux软件安装:源代码与Tarball

文章目录 Linux源码包的安装与升级什么是源代码、编译器与可执行文件什么是函数库什么是make与configure什么是Tarball的软件如何安装与升级软件 使用gcc进行编译的简单实例单一程序&#xff1a;打印 Hello World主、子程序链接&#xff1a;子程序的编译调用外部函数库&#xf…

三轴工作台激光焊接机:实现高精度、高效率焊接的新选择

三轴工作台激光焊接机是一种先进的焊接设备&#xff0c;结合了激光焊接技术与三轴工作台的运动控制&#xff0c;实现了焊接过程的高效、精准与自动化。这种设备主要利用激光束的高能量密度和高速度特性&#xff0c;使工件在熔化的同时快速冷却凝固&#xff0c;从而达到高质量的…

AXI Memory Mapped to PCI Express学习笔记(一)——PCIe事务

1 PCIe事务 AXI事务对于PCIe来说&#xff0c;主要涉及到在AXI总线和PCIe总线之间进行数据交换和通信的过程。在PCIe系统中&#xff0c;AXI总线作为一个连接不同组件的桥梁&#xff0c;可以实现高效的数据传输和事务处理。 AXI事务通常包括读事务和写事务。在读事务中&#xf…

Git的使用记录+坑的处理

上学期也使用过git拉取gitee的项目进行远程办公,但是因为那个项目太赶,所以没有记录是如何使用的. 现在这个项目需要拉取gitlab上面的资源,于是再次使用了git,就记录一下基本操作和一些问题. 1.基本命令 git clone <repository-url> [<repository-url>代表远程仓库…

Java中锁的分类

引言 在多线程并发编程场景中,锁作为一种至关重要的同步工具,承担着协调多个线程对共享资源访问秩序的任务。其核心作用在于确保在特定时间段内,仅有一个线程能够对资源进行访问或修改操作,从而有效地保护数据的完整性和一致性。锁作为一种底层的安全构件,有力地防止了竞…

Qt开发(2)——在已有VS项目中配置Qt

在之前的Qt开发学习中&#xff0c;基本都是在Qt Creator中创建一个Qt项目&#xff0c;或者即便是在VS中也是直接新建一个Qt项目。但很少有记录如何在已有的C项目中添加Qt,这就好比我有个项目已经开发完了&#xff0c;现在又说加个Qt界面的功能。这篇文章就是记录如何在已有项目…

Programming Abstractions in C阅读笔记:p338-p346

《Programming Abstractions in C》学习第80天&#xff0c;p338-p346&#xff0c;总计9页。 一、技术总结 栈的实现包括入栈、出栈、判断栈是否为满&#xff0c;判断栈是否为空等。作者结合RPN计算器来实现&#xff0c;稍显无聊。 /** File: rpncalc.c* ---------------* Th…

VMware和Xshell连接

1.开启虚拟机 2.使用管理员账户&#xff0c;点击未列出 3.输入用户名密码 4.点击编辑虚拟网络编辑器 5.记住自己的网关和IP地址 6.打开终端 7.输入命令&#xff0c;vim / etc / sysconfig / network -scripts / ifcfg-ens33 回车 8.修改图中两处按“ I ”键进入编辑 d…

号码采集协议讲解

仅供学习研究交流使用 需要的进去拿源码或者成品

k8s1.28.8版本安装prometheus并持久化数据

本文参考 [k8s安装prometheus并持久化数据_/prometheus-config-reloader:-CSDN博客](https://blog.csdn.net/vic_qxz/article/details/119598466)前置要求: 已经部署了NFS或者其他存储的K8s集群. 这里注意networkpolicies网络策略问题&#xff0c;可以后面删除这个策略&#x…

Mysql连接查询

目录 一、连接查询 1.1内连接 1.2 左连接 1.3 右连接 二、存储过程 2.1 简介 2.2 优点 2.3语法 2.4 创建与调用存储过程 ​编辑2.4.1 查看存储过程 2.5 存储过程的参数 2.6 删除存储过程 一、连接查询 MySQL 的连接查询&#xff0c;通常都是将来自两个或多个表的记录…

如何利用nginx在windows系统上搭建一个文件服务器

1&#xff1a;先下载windows版的nginx 官网 http://nginx.org/ 下载完后注意端口号&#xff08;默认端口号为&#xff1a;80&#xff09;是否被占用 启动nginx服务 地址为localhost的 如果出现 Welcome to nginx 就说明启动成功 2&#xff1a;然后进入conf文件里修改配置 …

Mysql数据库——高级SQL语句补充

目录 一、子查询——Subquery 1.环境准备 2.In——查询已知的值的数据记录 2.1子查询——Insert 2.2子查询——Update 2.3子查询——Delete 3.Not In——表示否定&#xff0c;不在子查询的结果集里 3.Exists——判断查询结果集是否为空 4.子查询——别名 二、视图—…

GPIO端口的BSRR的使用

BSRR 只写寄存器 既能控制管脚为高电平&#xff0c;也能控制管脚为低电平。对寄存器高 16bit 写1 对应管脚为低电平&#xff0c;对寄存器低16bit写1对应管脚为高电平。写 0 ,无动作 首先看GPIOC的定义 接着看这个类型的定义 可以看到BSRR为无符号的32位的整形 接下来看GPIO_Pi…

【旅游】泉州攻略v1.0.0

一、泉州古城 泉州市距离深圳大约520公里&#xff0c;从深圳北站出发&#xff0c;高铁大约3小时30分。 到达泉州西站后&#xff0c;往东南方向大约8公里&#xff0c;就可以到达主要的旅游景点泉州古城。 古城很适合使用一天玩耍&#xff0c;核心路线如下&#xff1a; 一路的景…

python安装与使用

1安装 1.0下载 从官网下载安装包欢迎来到 Python.orghttps://www.python.org/ 1.1安装 双击安装包 将 图中选项勾选 之后如图 在点击 等待安装&#xff0c;安装之后关闭&#xff08;Close&#xff09; 2实现第一个python程序 2.0打开python运行环境 安装之后在开始菜单…