Flink实现数据写入MySQL

 先准备一个文件里面数据有:

a, 1547718199, 1000000
b, 1547718200, 1000000
c, 1547718201, 1000000
d, 1547718202, 1000000
e, 1547718203, 1000000
f, 1547718204, 1000000
g, 1547718205, 1000000
h, 1547718210, 1000000
i, 1547718210, 1000000
j, 1547718210, 1000000

 scala代码:

import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._

  case class SensorReading(name: String, timestamp: Long, salary: Double)
  object a1 {
    def main(args: Array[String]): Unit = {
      val env = StreamExecutionEnvironment.getExecutionEnvironment
      env.setParallelism(1)
      //数据源
      val dataStream: DataStream[String] = env.readTextFile("D:\\wlf.备份24.1.3\\wlf\\ideaProgram\\bbbbbb\\src\\main\\resources\\salary.txt")
      val stream = dataStream.map(data => {
        val splited = data.split(",")
        SensorReading(splited(0), splited(1).trim.toLong, splited(2).trim.toDouble)
      })
      stream.addSink( new JDBCSink() )
      env.execute("  job")
    }
  }


  class JDBCSink() extends RichSinkFunction[SensorReading]{
    // 定义sql连接、预编译器
    var conn: Connection = _
    var insertStmt: PreparedStatement = _
    var updateStmt: PreparedStatement = _
    // 初始化,创建连接和预编译语句
    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      conn = DriverManager.getConnection("jdbc:mysql://bigdata1:3306/flink?serverTimezone=UTC", "root", "123456")
      insertStmt = conn.prepareStatement("INSERT INTO salary_table (name, salary) VALUES (?,?)")
      updateStmt = conn.prepareStatement("UPDATE salary_table SET salary = ? WHERE name = ?")
    }
    override def invoke(value: SensorReading): Unit = {
      // 执行更新语句
      updateStmt.setString(1, value.name)
      updateStmt.setDouble(2, value.salary)
      updateStmt.execute()
      // 如果update没有查到数据,那么执行插入语句
      if( updateStmt.getUpdateCount == 0 ){
        insertStmt.setString(1, value.name)
        insertStmt.setDouble(2, value.salary)
        insertStmt.execute()
      }
    }
    // 关闭时做清理工作
    override def close(): Unit = {
      insertStmt.close()
      updateStmt.close()
      conn.close()
    }
}

MySQL中查看表 :

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/351490.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Windows Server 安装 Docker

一、简介 Docker 不是一个通用容器工具,它依赖运行的 Linux 内核环境。Docker 实质上是在运行的 Linux 服务器上制造了一个隔离的文件环境,所以它执行的效率几乎等同于所部署的 Linux 主机服务器性能。因此,Docker 必须部署在 Linux 内核系统…

【保驾护航】HarmonyOS应用开发者基础认证-题库

通过系统化的课程学习,熟练掌握DevEco Studio,ArkTS,ArkUI,预览器,模拟器,SDK等HarmonyOS应用开发的关键概念,具备基础的应用开发能力。 考试说明 1、考试需实名认证,请在考前于个…

【LeetCode: 135. 分发糖果 + 贪心】

🚀 算法题 🚀 🌲 算法刷题专栏 | 面试必备算法 | 面试高频算法 🍀 🌲 越难的东西,越要努力坚持,因为它具有很高的价值,算法就是这样✨ 🌲 作者简介:硕风和炜,…

嵌入式-stm32-江科大-OLED调试工具

文章目录 一:OLED调试工具1.1 OLED显示屏介绍1.2 实验:在OLED显示屏的使用1.3 自己新增功能测试道友:今天没有开始的事,明天绝不会完成。 一:OLED调试工具 1.1 OLED显示屏介绍 学习任何一门语言就需要进行调试&#…

Java基础进阶03-注解和单元测试

目录 一、注解 1.概述 2.作用 3.自定义注解 (1)格式 (2)使用 (3)练习 4.元注解 (1)概述 (2)常见元注解 (3)Target &#x…

第13次修改了可删除可持久保存的前端html备忘录:删除按钮靠右,做了一个背景主题:现代深色

第13次修改了可删除可持久保存的前端html备忘录&#xff1a;删除按钮靠右&#xff0c;做了一个背景主题&#xff1a;现代深色 备忘录代码 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta name"vi…

LFU算法

LFU算法 Least Frequently Used&#xff08;最不频繁使用&#xff09; Leetcode有原题&#xff0c;之前手写过LRU&#xff0c;数据结构还是习惯于用java实现&#xff0c;实现是copy的评论题解。 题解注释写的很清楚 大致就是说LFUCache类维护一个存放node的map&#xff0c;同…

立创EDA学习:设计收尾工作

布线整理 ShiftM&#xff0c;关闭铺铜显示 调整结束后再使用快捷键”ShiftM“打开铺铜 过孔 在空白区域加上一些GND过孔&#xff0c;连接顶层与底层的铺铜。放置好”过孔“后&#xff0c;隐藏铺铜&#xff0c;观察刚才放置的过孔有没有妨碍到其他器件 调整铺铜 先打开铺铜区&…

php mysql字段默认值使用问题

前提是使用了事务&#xff0c;在第一个阶段 是A表操作保存&#xff0c;第二阶段操作B表&#xff0c;操作B表的时候使用了A表的一个字段&#xff0c;这个字段在第一阶段没有设置值&#xff0c;保存的时候使用字段默认值。 【这种情况 最好是在第一阶段 把后面要使用的字段设置好…

C#在图片上输出文字和保存

winform&#xff0c;图片控件&#xff0c;加载一个图片&#xff0c;在图片上输出文字&#xff1b; 输出文字的代码如下&#xff1b; private void pictureBox1_Paint(object sender, PaintEventArgs e){Graphics g1 e.Graphics;g1.DrawString("测试", this.Font, B…

物联网IOT视频设备如何快速对接阿里云生活物联网(Link Visual)并成功上云?

原文永久更新地址&#xff1a;https://www.yundashi168.com/472.html 文章来源&#xff1a;猿视野 如果有图片看不清楚&#xff0c;加载不出来&#xff0c;请阅读原文。 什么是Link Visual、 Link Visual是生活物联网平台针对视频产品推出的增值服务&#xff0c;提供视频数据上…

指针的深入理解1

1.如何理解指针 假设有一栋宿舍楼&#xff0c;把你放在楼里&#xff0c;楼上有100个房间&#xff0c;但是房间没有编号&#xff0c;你的一个朋友来找你玩&#xff0c; 如果想找到你&#xff0c;就得挨个房子去找&#xff0c;这样效率很低&#xff0c;但是我们如果根据楼层和楼…

C#,恩廷格尔组合数(Entringer Number)的算法与源程序

恩廷格尔组合数&#xff08;Entringer Number&#xff09;组合数学的序列数字之一。 E&#xff08;n&#xff0c;k&#xff09;是{1&#xff0c;2&#xff0c;…&#xff0c;n1}的排列数&#xff0c;从k1开始&#xff0c;先下降后上升。 计算结果&#xff1a; 源代码&#xf…

用Excel辅助做数独

做数独游戏的时候&#xff0c;画在纸上很容易弄花眼&#xff0c;所以我考虑用Excel辅助做一个。 界面如下&#xff1a; 按下初始化表格区域按钮&#xff0c;会在所有单元格中填充“123456789”。如下图&#xff1a; 当某个单元格删除得只剩一个数字时&#xff0c;会将同一行、…

[UI5 常用控件] 03.Icon, Avatar,Image

文章目录 前言1. Icon2. Avatar2.1 displayShape2.2 initials2.3 backgroundColor2.4 Size2.5 fallbackIcon2.6 badgeIcon2.7 badgeValueState2.8 active 3. Image 前言 本章节记录常用控件Title,Link,Label。 其路径分别是&#xff1a; sap.m.Iconsap.m.Avatarsap.m.Image 1…

C++面试:散列表

目录 1. 散列表的基本概念 散列表的定义 散列函数 哈希冲突 2. 处理冲突的方法 链地址法&#xff08;Separate Chaining&#xff09; 开放地址法 再散列 3. 散列表的性能分析 1. 平均查找长度&#xff08;ASL&#xff09; 2. 负载因子&#xff08;Load Factor&#…

CAD-autolisp(二)——选择集、命令行设置对话框、符号表

目录 一、选择集1.1 选择集的创建1.2 选择集的编辑1.3 操作选择集 二、命令行设置对话框2.1 设置图层2.2 加载线型2.3 设置字体样式2.4 设置标注样式&#xff08;了解即可&#xff09; 三、符号表3.1 简介3.2 符号表查找3.2 符号表删改增 一、选择集 定义&#xff1a;批量选择…

go语言(十八)---- goroutine

一、goroutine package mainimport ("fmt""time" )func main() {//用go创建承载一个形参为空&#xff0c;返回值为空的一个函数go func() {defer fmt.Println("A.defer")func() {defer fmt.Println("B.defer")//退出当前goroutinefmt…

《WebKit 技术内幕》学习之十五(6):Web前端的未来

6 Chromium OS和Chrome的Web应用 6.1 基本原理 HTML5技术已经不仅仅用来编写网页了&#xff0c;也可以用来实现Web应用。传统的操作系统支持本地应用&#xff0c;那么是否可以有专门的操作系统来支持Web应用呢&#xff1f;当然&#xff0c;现在已经有众多基于Web的操作系统&…

跟无神学AI之Prompt

在大模型时代会写prompt变得很重要。 Prompt翻译为中文为提示词&#xff0c;在大模型的特定领域指的是大模型使用者给大模型提交的一种有一定格式的交互命令&#xff0c;让我们看看科大讯飞的大模型给出的答案—— Prompt是一种向人工智能模型提供的输入文本或指令&#xff0…