文章目录
- 1.Doris建表
- 2.Doris依赖
- 3.Bean实体类
- 4.Doris业务写入逻辑
- 5.测试写入类
- 6.发送数据
1.Doris建表
Doris中建表
CREATE TABLE IF NOT EXISTS demo.user
(
`id` INT NOT NULL,
`name` VARCHAR(255),
`age` INT
) DISTRIBUTED BY HASH(`id`)
PROPERTIES (
"replication_num" = "1"
);
2.Doris依赖
Flink开发相关依赖
<properties>
<flink.version>1.12.1</flink.version>
<scala.version>2.12.13</scala.version>
<mysql.version>8.0.25</mysqlc.version>
<lombok.version>1.18.12</lombok.version>
</properties>
<dependencies>
<!-- 写入数据到doris -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version</version>
</dependency>
<!-- flink核心API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
</dependencies>
3.Bean实体类
User.java
package com.daniel.bean;
import lombok.Builder;
import lombok.Data;
/**
* @Author Daniel
* @Date: 2023/7/3 15:35
* @Description
**/
@Data
@Builder
public class User {
public int id;
public String name;
public int age;
}
4.Doris业务写入逻辑
DorisSinkFunction.java
package com.daniel.util;
import com.daniel.bean.User;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
/**
* @Author Daniel
* @Date: 2023/7/3 15:36
* @Description
**/
public class DorisSinkFunction extends RichSinkFunction<User> {
Connection conn = null;
String sql;
public DorisSinkFunction(String sql) {
this.sql = sql;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
conn = getConn("localhost", 9030, "demo");
}
@Override
public void close() throws Exception {
super.close();
if (conn != null) {
conn.close();
}
}
// 定义具体的操作
@Override
public void invoke(User user, Context context) throws Exception {
// 批量插入
PreparedStatement preparedStatement = conn.prepareStatement(sql);
preparedStatement.setLong(1, user.id);
preparedStatement.setString(2, user.name);
preparedStatement.setLong(3, user.age);
preparedStatement.addBatch();
long startTime = System.currentTimeMillis();
int[] batchResult = preparedStatement.executeBatch();
long endTime = System.currentTimeMillis();
System.out.println("批量插入用时:" + (endTime - startTime) + "ms -- 插入数据行数:" + batchResult.length);
}
public Connection getConn(String host, int port, String database) throws SQLException, ClassNotFoundException {
Class.forName("com.mysql.cj.jdbc.Driver");
String address = "jdbc:mysql://" + host + ":" + port + "/" + database;
conn = DriverManager.getConnection(address, "root", "");
return conn;
}
}
-
open():在SinkFunction实例化后调用,用于初始化连接或资源。这在处理每个并行任务的子任务之前只被调用一次。
-
invoke():定义了在每个元素到达Sink操作时所执行的逻辑。用户需要实现这个方法来定义如何将数据写入外部存储系统或执行其他操作。
-
close():在SinkFunction关闭之前调用,用于释放资源、关闭连接等操作。
5.测试写入类
DorisWriteTest.java
package com.daniel;
import com.daniel.bean.User;
import com.daniel.util.DorisSinkFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @Author Daniel
* @Date: 2023/7/3 15:37
* @Description
**/
public class DorisWriteTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// Source
DataStream<String> ds = env.socketTextStream("localhost", 9999);
// Transform
SingleOutputStreamOperator<User> dataStream = ds.map((MapFunction<String, User>) data -> {
String[] split = data.split(",");
return User.builder()
.id(Integer.parseInt(split[0]))
.name(split[1])
.age(Integer.parseInt(split[2]))
.build();
});
// Sink
String sql = "INSERT INTO demo.user (id, name, age) VALUES (?,?,?)";
DorisSinkFunction jdbcSink = new DorisSinkFunction(sql);
dataStream.addSink(jdbcSink);
env.execute("flink-doris-write");
}
}
6.发送数据
使用nc或者任意工具向指定端口发送数据
例如
nc -L -p 9999
发送数据
1,Daniel,25
2,David,38
3,James,16
4,Robert,27
然后启动DorisWriteTest.java
程序
查询数据
select *
from demo.user;
由于这里是并行插入,所以没有顺序可言