一、写在前面
在实际的生产环境中,我们经常会把Flink处理的数据写入MySQL、Doris等数据库中,下面以MySQL为例,使用JDBC的方式将Flink的数据实时数据写入MySQL。
二、代码示例
2.1 版本说明
<flink.version>1.14.6</flink.version>
<spark.version>2.4.3</spark.version>
<hadoop.version>2.8.5</hadoop.version>
<hbase.version>1.4.9</hbase.version>
<hive.version>2.3.5</hive.version>
<java.version>1.8</java.version>
<scala.version>2.11.8</scala.version>
<mysql.version>8.0.22</mysql.version>
<scala.binary.version>2.11</scala.binary.version>
2.2 导入相关依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!--mysql连接器依赖-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.22</version>
</dependency>
2.3 连接数据库,创建表
mysql> CREATE TABLE `ws` (
`id` varchar(100) NOT NULL
,`ts` bigint(20) DEFAULT NULL
,`vc` int(11) DEFAULT NULL, PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
2.4 创建POJO类
package com.flink.POJOs;
import java.util.Objects;
/**
* TODO POJO类的特点
* 类是公有(public)的
* 有一个无参的构造方法
* 所有属性都是公有(public)的
* 所有属性的类型都是可以序列化的
*/
public class WaterSensor {
//类的公共属性
public String id;
public Long ts;
public Integer vc;
//无参构造方法
public WaterSensor() {
//System.out.println("调用了无参数的构造方法");
}
public WaterSensor(String id, Long ts, Integer vc) {
this.id = id;
this.ts = ts;
this.vc = vc;
}
//生成get和set方法
public void setId(String id) {
this.id = id;
}
public void setTs(Long ts) {
this.ts = ts;
}
public void setVc(Integer vc) {
this.vc = vc;
}
public String getId() {
return id;
}
public Long getTs() {
return ts;
}
public Integer getVc() {
return vc;
}
//重写toString方法
@Override
public String toString() {
return "WaterSensor{" +
"id='" + id + '\'' +
", ts=" + ts +
", vc=" + vc +
'}';
}
//重写equals和hasCode方法
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WaterSensor that = (WaterSensor) o;
return id.equals(that.id) && ts.equals(that.ts) && vc.equals(that.vc);
}
@Override
public int hashCode() {
return Objects.hash(id, ts, vc);
}
}
//scala的case类?
2.5 自定义map函数
package com.flink.POJOs;
import org.apache.flink.api.common.functions.MapFunction;
public class WaterSensorMapFunction implements MapFunction<String, WaterSensor> {
@Override
public WaterSensor map(String value) throws Exception {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
}
}
2.5 Flink2MySQL
package com.flink.DataStream.Sink;
import com.flink.POJOs.WaterSensor;
import com.flink.POJOs.WaterSensorMapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.sql.PreparedStatement;
import java.sql.SQLException;
/**
* Flink 输出到 MySQL(JDBC)
*/
public class flinkSinkJdbc {
public static void main(String[] args) throws Exception {
//TODO 创建Flink上下文执行环境
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment.setParallelism(1);
//TODO Source
DataStreamSource<String> dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);
//TODO Transfer
SingleOutputStreamOperator<WaterSensor> waterSensorSingleOutputStreamOperator = dataStreamSource.map(new WaterSensorMapFunction());
/**TODO 写入 mysql
* 1、只能用老的 sink 写法
* 2、JDBCSink 的 4 个参数:
* 第一个参数: 执行的 sql,一般就是 insert into
* 第二个参数: 预编译 sql, 对占位符填充值
* 第三个参数: 执行选项 ---->攒批、重试
* 第四个参数: 连接选项---->url、用户名、密码
*/
SinkFunction<WaterSensor> sinkFunction = JdbcSink.sink("insert into ws values(?,?,?)",
new JdbcStatementBuilder<WaterSensor>() {
@Override
public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
preparedStatement.setString(1, waterSensor.getId());
preparedStatement.setLong(2, waterSensor.getTs());
preparedStatement.setInt(3, waterSensor.getVc());
System.out.println("数据写入成功:"+'('+waterSensor.getId()+","+waterSensor.getTs()+","+waterSensor.getVc()+")");
}
}
, JdbcExecutionOptions
.builder()
.withMaxRetries(3) // 重试次数
.withBatchSize(100) // 批次的大小:条数
.withBatchIntervalMs(3000) // 批次的时间
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/dw?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
.withUsername("root")
.withPassword("********")
.withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
.build()
);
//TODO 写入到Mysql
waterSensorSingleOutputStreamOperator.addSink(sinkFunction);
streamExecutionEnvironment.execute();
}
}
2.6 启动necat、Flink,观察数据库写入情况
nc -lk 9999 #启动necat、并监听8888端口,写入数据
启动Flink程序
查看数据库写入是否正常