Flink-SQL 写入PostgreSQL 问题汇总

1.主键字段为空问题

  • 错误信息
org.apache.flink.table.api.TableException: Column 'bus_no' is NOT NULL, however, a null value is being written into it. You can set job configuration 'table.exec.sink.not-null-enforcer'='DROP' to suppress this exception and drop such records silently.

在这里插入图片描述

  • 问题原因
    	sink 表定义了主键,flink-sql在使用jdbc 插入时,定义的主键中的属性存在空值
    PRIMARY  KEY (col,col2,col3,col4,col5,col6,col7) NOT ENFORCED
    
  • 解决
    确定主键属性是否有空值,若为空则确定是否可作为主键属性(若正确则将其置为默认值);若不符合业务情况,则重新定义主属性集合,确保不出现空值

2.flink-sql jdbc 并发写入Mysql出现死锁

  • 错误信息
 org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:240)
java.sql.BatchUpdateException: Batch entry 0 INSERT INTO dm_hljy.dm_hljy_sims_day_payment_info(sign_year_type, pay_source, campus_name, school_name, depart_name, pay_time, pay_channel_name, day_fee_amounts) VALUES ('2033', '3', '教育 ', ' 初级中学', ' 初级中学', '2022-02-18 00:00:00+08'::timestamp, '微信', '61310.00'::numeric) ON DUPLICATE KEY UPDATE day_fee_amounts=VALUES(day_fee_amounts) was aborted: ERROR: dn_6003_6004: deadlock detected
  Detail: Process 139972369676032 waits for ShareLock on transaction 1091179704; blocked by process 139976955991808.
Process 139976955991808 waits for ShareLock on transaction 1091179703; blocked by process 139972369676032.
  Hint: See server log for query details.  Call getNextException to see other errors in the batch.
	at com.huawei.gauss200.jdbc.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:171) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
	at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.executeBatch(QueryExecutorImpl.java:586) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
	at com.huawei.gauss200.jdbc.jdbc.PgStatement.executeBatch(PgStatement.java:883) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
	at com.huawei.gauss200.jdbc.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1580) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
	at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:266) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:236) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:215) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:57) ~[flink-connector-jdbc-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer.processElement(SinkUpsertMaterializer.java:128) ~[flink-table-runtime-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:524) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:216) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:812) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) [flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) [flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) [flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) [flink-dist-1.15.0-h0.cbu.mrs.320.r33.jar:1.15.0-h0.cbu.mrs.320.r33]
	at java.lang.Thread.run(Thread.java:750) [?:1.8.0_332]
Caused by: com.huawei.gauss200.jdbc.util.PSQLException: ERROR: dn_6003_6004: deadlock detected
  Detail: Process 139972369676032 waits for ShareLock on transaction 1091179704; blocked by process 139976955991808.
Process 139976955991808 waits for ShareLock on transaction 1091179703; blocked by process 139972369676032.
  Hint: See server log for query details.
	at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2856) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
	at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2587) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
	at com.huawei.gauss200.jdbc.core.v3.QueryExecutorImpl.executeBatch(QueryExecutorImpl.java:575) ~[huaweicloud-dws-jdbc-8.1.1.1-200.jar:?]
	... 30 more

在这里插入图片描述

  • 错误原因
flink-sql在写入sql时定义了主键,flink-sql进行upser 操作(主键记录不存在则进行insert,存在则进行update)
flink-sql 插入时,会根据主键对sink表进行查询,若并发度大于1,则可能存在两个及以上线程查询同一条记录,出现死锁情况
  • 解决
 设置flink-sql jdbc 并发度为1,(flink jdbc 并发度跟随flink 作业并发度;也可以单独设置)
设置flink-sql 并发度 : parallelism.default: 1
也可以单独设置jdbc 并发度: (不跟随flink作业并发度) 
WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:gaussdb://dws-hl-datalake.dws.myhuaweiclouds.com:8000/hl_datamart',
  'table-name' = 'dm_hljy.dm_hljy_sims_security_send_back_student_info',
   'connection.max-retry-timeout' = '600s',
  'sink.max-retries' = '6',
   'sink.parallelism'= '1',
   'username' = '用户名',
   'password' = ‘密码'
 );

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/32817.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

罗技k380键盘教程

在智能手机和平板电脑上享受台式电脑般舒适便捷的输入体验。罗技蓝牙™ 多设备键盘 K380 是一款小巧独特的键盘,让您在家中任何地方都能使用个人设备进行沟通和创作。 借助便捷的易于切换™ 按钮,可以通过蓝牙™ 无线技术同时连接最多三台设备&#xff…

【实用技巧】使用USB数据线向亚马逊kindle导入电子书

一、内容简介 本文主要介绍如何使用USB数据线向亚马逊kindle阅读器导入电子书。 二、所需原料 笔记本电脑、Kindle阅读器、Kindle适配的USB-a数据线。 三、导入方法 1、使用USB-a数据线将Kindle阅读器与电脑连接。 2、找到Kindle文件夹-documents-Downloads-Items1目录。…

Django框架实现简单的接口开发

前提创建一个Django项目&#xff0c;目录如下&#xff1a; Django框架上进行GET请求接口开发示例: 1.在上面项目结构目录Template下&#xff0c;新建一个login.html页面&#xff0c;定义表单提交请求的方式为post&#xff0c;具体代码如下。 <!DOCTYPE HTML> <html …

freemarker 使用word模板赋值

1. 引包<dependency><groupId>org.freemarker</groupId><artifactId>freemarker</artifactId><version>2.3.28</version></dependency>word文档工具类import freemarker.template.Configuration; import freemarker.template.…

快来看看Java在编程语言中的优势与特性吧

作者主页&#xff1a;paper jie的博客_CSDN博客-C语言,算法详解领域博主 本文作者&#xff1a;大家好&#xff0c;我是paper jie&#xff0c;感谢你阅读本文&#xff0c;欢迎一建三连哦。 其他专栏&#xff1a;《系统解析C语言》《C语言》《C语言-语法篇》 内容分享&#xff1a…

CentOS 7.9 安装 Jenkins

CentOS 7.9 安装 Jenkins 文章目录 CentOS 7.9 安装 Jenkins一、概述二、安装1、安装 OpenJDK2、安装 Jenkins3、启动 Jenkins4、给 Jenkins 放行端口 三、初始化 Jenkins 配置1、访问2、解锁 Jenkins3、配置清华大学的源地址4、安装插件5、创建管理员用户6、完成安装 四、功能…

【C++】C++关于异常的学习

文章目录 C语言传统的处理错误的方式一、异常的概念及用法二、自定义异常体系总结 C语言传统的处理错误的方式 传统的错误处理机制&#xff1a; 1. 终止程序&#xff0c;如 assert &#xff0c;缺陷&#xff1a;用户难以接受。如发生内存错误&#xff0c;除 0 错误时就会终止…

三相一次重合闸程序逻辑原理(二)

在手动合闸至故障线路或手动分闸及保护或自动装置要求不允许重合闸&#xff08;如母线、变压器保护及低频减载动作&#xff09;等情况下&#xff0c;闭锁重合闸的输入开关量触点接通&#xff0c;H4输出“1”&#xff0c;非门Z4输出“0”&#xff0c;计数器清零&#xff08;CD0&…

基于Java+SpringBoot+Vue前后端分离网课在线学习观看系统

博主介绍&#xff1a;✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专…

Spring Boot banner详解

Spring Boot 3.x系列文章 Spring Boot 2.7.8 中文参考指南(一)Spring Boot 2.7.8 中文参考指南(二)-WebSpring Boot 源码阅读初始化环境搭建Spring Boot 框架整体启动流程详解Spring Boot 系统初始化器详解Spring Boot 监听器详解Spring Boot banner详解 自定义banner Spring …

Kubernetes API Server源码学习(二):OpenAPI、API Resource的装载、HTTP Server具体是怎么跑起来的?

本文基于Kubernetes v1.22.4版本进行源码学习 6、OpenAPI 1&#xff09;、OpenAPI的作用 OpenAPI是由Swagger发展而来的一个规范&#xff0c;一种形式化描述Restful Service的语言&#xff0c;便于使用者理解和使用一个Service。通过OpenAPI规范可以描述一个服务&#xff1a;…

2024考研408-计算机组成原理第四章-指令系统学习笔记

文章目录 前言一、指令系统现代计算机的结构1.1、指令格式1.1.1、指令的定义1.1.2、指令格式1.1.3、指令—按照地址码数量分类①零地址指令②一地址指令&#xff08;1个操作数、2个操作数情况&#xff09;③二地址指令④三地址指令⑤四地址指令 1.1.4、指令-按照指令长度分类1.…

基于Java高校教师科研信息展示网站设计实现(源码+lw+部署文档+讲解等)

博主介绍&#xff1a;✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专…

[Hadoop] 期末答辩问题准备

0.相关概念 1.什么是NameNode&#xff1f; NameNode是整个文件系统的管理节点&#xff0c;它维护着整个文件系统的文件目录树&#xff0c;文件/目录的元信息和每个文件对应的数据块列表。并接收用户的操作请求。 2.SecondaryNameNode的主要作用&#xff1f; SecondaryNameN…

学习Angular的编程之旅

目录 1、简介 2、特点 2.1 横跨多种平台 2.2 速度与性能 2.3 美妙的工具 3、Angular 应用&#xff1a;知识要点 3.1 组件 3.2 模板 3.3 依赖注入 4、与其他框架的对比 1、简介 Angular 是一个应用设计框架与开发平台&#xff0c;旨在创建高效而精致的单页面应用。 A…

别再用查询count,判断数据是否存在了

目录 一、目前多数人的写法 二、优化方案 三、总结 大家在实际的开发过程中&#xff0c;会根据某些条件&#xff0c;从数据库表中查询出是否存在符合该条件的数据。无论是刚入行的程序员小白&#xff0c;还是久经沙场多年的程序员老白&#xff0c;都是一如既往的SELECT count(*…

【探索 Kubernetes|作业管理篇 系列 11】控制器的核心功能

前言 大家好&#xff0c;我是秋意零。 上一篇结束了 Pod 对象的内容。 今天要探讨的内容是 “控制器”&#xff0c;它是 Kubernetes 编排最核心的功能。理解了 “控制器”&#xff0c;你就能理解 Deployment、StatefulSet、DaemontSet、Job、CroJob 控制器对象。 最近搞了一…

数据结构之复杂度分析

1、大 O 复杂度表示法 算法的执行效率&#xff0c;粗略地讲&#xff0c;就是算法代码执行的时间 这里有段非常简单的代码&#xff0c;求 1,2,3…n 的累加和。看如何来估算一下这段代码的执行时间 int cal(int n) {int sum 0;int i 1;for (; i < n; i) {sum sum i;}ret…

pg报错attempted to delete invisible tuple

问题描述 postgresql数据库执行delete报错&#xff1a;attempted to delete invisible tuple&#xff0c;执行同样条件的select不报错 delete from lzltab1; select count(*) from lzltab1;执行全表删除和全表查询的结果&#xff1a; M# delete from lzltab1; ERROR: 5500…

【Unity之IMGUI】—位置信息类和控件基类的封装

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a; ⭐…