文章目录
- 1.ClickHouse建表
- 1.ClickHouse依赖
- 2.Bean实体类
- 3.ClickHouse业务写入逻辑
- 4.测试写入类
- 5.发送数据
1.ClickHouse建表
ClickHouse中建表
CREATE TABLE default.test_write
(
id UInt16,
name String,
age UInt16
) ENGINE = TinyLog();
1.ClickHouse依赖
Flink开发相关依赖
<properties>
<flink.version>1.12.1</flink.version>
<scala.version>2.12.13</scala.version>
<clickhouse-jdbc.version>0.1.54</clickhouse-jdbc.version>
<lombok.version>1.18.12</lombok.version>
</properties>
<dependencies>
<!-- 写入数据到clickhouse -->
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>${clickhouse-jdbc.version}</version>
</dependency>
<!-- flink核心API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
</dependencies>
2.Bean实体类
User.java
package com.daniel.bean;
import lombok.Builder;
import lombok.Data;
/**
* @Author Daniel
* @Date: 2023/7/3 15:35
* @Description
**/
@Data
@Builder
public class User {
public int id;
public String name;
public int age;
}
3.ClickHouse业务写入逻辑
ClickHouseSinkFunction.java
package com.daniel.util;
import com.daniel.bean.User;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
/**
* @Author Daniel
* @Date: 2023/7/3 15:36
* @Description
**/
public class ClickHouseSinkFunction extends RichSinkFunction<User> {
Connection conn = null;
String sql;
public ClickHouseSinkFunction(String sql) {
this.sql = sql;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
conn = getConn("localhost", 8123, "default");
}
@Override
public void close() throws Exception {
super.close();
if (conn != null) {
conn.close();
}
}
// 定义具体的操作
@Override
public void invoke(User user, Context context) throws Exception {
// 批量插入
PreparedStatement preparedStatement = conn.prepareStatement(sql);
preparedStatement.setLong(1, user.id);
preparedStatement.setString(2, user.name);
preparedStatement.setLong(3, user.age);
preparedStatement.addBatch();
long startTime = System.currentTimeMillis();
int[] ints = preparedStatement.executeBatch();
conn.commit();
long endTime = System.currentTimeMillis();
System.out.println("批量插入用时:" + (endTime - startTime) + "ms -- 插入数据行数:" + ints.length);
}
public Connection getConn(String host, int port, String database) throws SQLException, ClassNotFoundException {
Class.forName("ru.yandex.clickhouse.ClickHouseDriver");
String address = "jdbc:clickhouse://" + host + ":" + port + "/" + database;
conn = DriverManager.getConnection(address);
return conn;
}
}
-
open():在SinkFunction实例化后调用,用于初始化连接或资源。这在处理每个并行任务的子任务之前只被调用一次。
-
invoke():定义了在每个元素到达Sink操作时所执行的逻辑。用户需要实现这个方法来定义如何将数据写入外部存储系统或执行其他操作。
-
close():在SinkFunction关闭之前调用,用于释放资源、关闭连接等操作。
4.测试写入类
ClickHouseWriteTest.java
package com.daniel;
import com.daniel.bean.User;
import com.daniel.util.ClickHouseSinkFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @Author daniel
* @Date: 2023/7/3 15:37
* @Description
**/
public class ClickHouseWriteTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// Source
DataStream<String> ds = env.socketTextStream("localhost", 9999);
// Transform
SingleOutputStreamOperator<User> dataStream = ds.map((MapFunction<String, User>) data -> {
String[] split = data.split(",");
return User.builder()
.id(Integer.parseInt(split[0]))
.name(split[1])
.age(Integer.parseInt(split[2]))
.build();
});
// Sink
String sql = "INSERT INTO default.test_write (id, name, age) VALUES (?,?,?)";
ClickHouseSinkFunction jdbcSink = new ClickHouseSinkFunction(sql);
dataStream.addSink(jdbcSink);
env.execute("flink-clickhouse-write");
}
}
5.发送数据
使用nc或者任意工具向指定端口发送数据
例如
nc -L -p 9999
发送数据
1,Daniel,25
2,David,38
3,James,16
4,Robert,27
然后启动ClickHouseWriteTest.java
程序
查询数据
select *
from default.test_write;
由于这里是并行插入,所以没有顺序可言