Flink写入数据到Doris

文章目录

      • 1.Doris建表
      • 2.Doris依赖
      • 3.Bean实体类
      • 4.Doris业务写入逻辑
      • 5.测试写入类
      • 6.发送数据

1.Doris建表

Doris中建表

CREATE TABLE IF NOT EXISTS demo.user
(
 `id`   INT NOT NULL,
 `name` VARCHAR(255),
 `age`  INT
) DISTRIBUTED BY HASH(`id`)
PROPERTIES (
 "replication_num" = "1"
);

2.Doris依赖

Flink开发相关依赖

    <properties>
        <flink.version>1.12.1</flink.version>
        <scala.version>2.12.13</scala.version>
        <mysql.version>8.0.25</mysqlc.version>
        <lombok.version>1.18.12</lombok.version>
    </properties>

    <dependencies>
        <!-- 写入数据到doris -->
         <dependency>
             <groupId>mysql</groupId>
             <artifactId>mysql-connector-java</artifactId>
             <version>${mysql.version</version>
         </dependency>
        <!-- flink核心API -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
        </dependency>
    </dependencies>

3.Bean实体类

User.java

package com.daniel.bean;

import lombok.Builder;
import lombok.Data;

/**
 * @Author Daniel
 * @Date: 2023/7/3 15:35
 * @Description
 **/

@Data
@Builder
public class User {
    public int id;
    public String name;
    public int age;
}

4.Doris业务写入逻辑

DorisSinkFunction.java

package com.daniel.util;

import com.daniel.bean.User;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
* @Author Daniel
* @Date: 2023/7/3 15:36
* @Description
**/


public class DorisSinkFunction extends RichSinkFunction<User> {
 Connection conn = null;
 String sql;

 public DorisSinkFunction(String sql) {
     this.sql = sql;
 }

 @Override
 public void open(Configuration parameters) throws Exception {
     super.open(parameters);
     conn = getConn("localhost", 9030, "demo");
 }

 @Override
 public void close() throws Exception {
     super.close();
     if (conn != null) {
         conn.close();
     }
 }

 // 定义具体的操作
 @Override
 public void invoke(User user, Context context) throws Exception {
     // 批量插入
     PreparedStatement preparedStatement = conn.prepareStatement(sql);
     preparedStatement.setLong(1, user.id);
     preparedStatement.setString(2, user.name);
     preparedStatement.setLong(3, user.age);
     preparedStatement.addBatch();

     long startTime = System.currentTimeMillis();
     int[] batchResult = preparedStatement.executeBatch();
     long endTime = System.currentTimeMillis();
     System.out.println("批量插入用时:" + (endTime - startTime) + "ms -- 插入数据行数:" + batchResult.length);
 }

 public Connection getConn(String host, int port, String database) throws SQLException, ClassNotFoundException {
     Class.forName("com.mysql.cj.jdbc.Driver");
     String address = "jdbc:mysql://" + host + ":" + port + "/" + database;
     conn = DriverManager.getConnection(address, "root", "");
     return conn;
 }
}
  • open():在SinkFunction实例化后调用,用于初始化连接或资源。这在处理每个并行任务的子任务之前只被调用一次。

  • invoke():定义了在每个元素到达Sink操作时所执行的逻辑。用户需要实现这个方法来定义如何将数据写入外部存储系统或执行其他操作。

  • close():在SinkFunction关闭之前调用,用于释放资源、关闭连接等操作。

5.测试写入类

DorisWriteTest.java

package com.daniel;

import com.daniel.bean.User;
import com.daniel.util.DorisSinkFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


/**
* @Author Daniel
* @Date: 2023/7/3 15:37
* @Description
**/

public class DorisWriteTest {
 public static void main(String[] args) throws Exception {
     StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
     // Source
     DataStream<String> ds = env.socketTextStream("localhost", 9999);

     // Transform
     SingleOutputStreamOperator<User> dataStream = ds.map((MapFunction<String, User>) data -> {
         String[] split = data.split(",");
         return User.builder()
                 .id(Integer.parseInt(split[0]))
                 .name(split[1])
                 .age(Integer.parseInt(split[2]))
                 .build();
     });

     // Sink
     String sql = "INSERT INTO demo.user (id, name, age) VALUES (?,?,?)";
     DorisSinkFunction jdbcSink = new DorisSinkFunction(sql);
     dataStream.addSink(jdbcSink);
     env.execute("flink-doris-write");
 }
}


6.发送数据

使用nc或者任意工具向指定端口发送数据
例如

nc -L -p 9999

发送数据

1,Daniel,25
2,David,38
3,James,16
4,Robert,27

然后启动DorisWriteTest.java程序

在这里插入图片描述
查询数据

 select *
 from demo.user;

由于这里是并行插入,所以没有顺序可言

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/43386.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

顺丰基于 Flink CDC + Hudi 推进实时业务落地

摘要&#xff1a;本文整理自大数据研发高级工程师唐尚文&#xff0c;在 Flink Forward Asia 2022 数据集成专场的分享。本篇内容主要分为三个部分&#xff1a; 应用场景 实践与优化 未来规划 点击查看原文视频 & 演讲PPT 一、应用场景 1.1 顺丰集团业务概览 顺丰除了大家…

宝塔的Redis绑定IP

宝塔安装Redis 软件商店搜索Redis 连接宝塔面板的redis服务器失败的解决办法 检查Linux是否放行6379端口修改Redis绑定IP检查阿里云/腾讯云的防火墙策略是否放行6379端口 1.bind 127.0.0.1 修改为 bind 0.0.0.0 127.0.0.1 表示只允许本地访问,无法远程连接 0.0.0.0 表…

51单片机--AT24C02数据存储

文章目录 存储器的介绍AT24C02I2C总线I2C时序结构AT24C02数据帧AT24C02数据存储实例 存储器的介绍 存储器是计算机系统中的一种重要设备&#xff0c;用于存储程序和数据&#xff0c;它可以通过电子、磁性介质等技术来记录和保持数据。在这里&#xff0c;主要介绍的是随机存储器…

TableGPT: Towards Unifying Tables, Nature Language and Commands into One GPT

论文标题&#xff1a;TableGPT: Towards Unifying Tables, Nature Language and Commands into One GPT 论文地址&#xff1a;https://github.com/ZJU-M3/TableGPT-techreport/blob/main/TableGPT_tech_report.pdf 发表机构&#xff1a;浙江大学 发表时间&#xff1a;2023 本文…

BeanFactory容器的构建和使用示例

BeanFactory容器的实现流程&#xff1a; BeanFactory是Spring框架中的一部分&#xff0c;它提供了IoC&#xff08;控制反转&#xff09;的实现机制。下面是BeanFactory的IoC实现过程&#xff1a; 定义Bean定义&#xff1a;首先&#xff0c;我们需要在配置文件中定义Bean的定义…

详解分类指标Precision,Recall,F1-Score

文章目录 1. Precision&#xff08;精度&#xff09;2. Recall&#xff08;召回率&#xff09;3. F1-Score4. Accuracy&#xff08;准确率&#xff09;5. P-R 曲线6. TPR、FPR6.1 TPR&#xff08;真正率&#xff09;6.2 FPR&#xff08;假正率&#xff09; 7. ROC曲线8. AUC曲线…

vscode使用g++编译.c文件或.cpp文件

vscode是一个跨平台、轻量级、插件非常丰厚的IDE&#xff0c;这里介绍在vscode里使用g来编译.cpp文件。g也叫GCC, 在Window中&#xff0c;是使用MinGW方式实现g的&#xff0c;它分为32位和64位2个版本&#xff0c;其中&#xff0c;MinGW-64是64位的&#xff0c;MinGW-32是32位的…

linux学成之路(基础篇)(二十)rsync服务器

目录 前言 一、概述 监听端口 二、特点 快捷 安全 三、数据的同步方式 四、rsync传输方式 本地传输 远程传输 守护进程 五、命令 作为远程命令 作为rsync服务 选项 六、配置文件 全局配置 模块配置 守护进程传输 七、rsyncinotfy实时同步 一、服务端 二、…

力扣奇遇记 [第一章]

文章目录 &#x1f626;第一题&#xff1a;拿下LeetCode1769. 移动所有球到每个盒子所需的最小操作数学习内容&#xff1a;LeetCode1769. 移动所有球到每个盒子所需的最小操作数&#x1f648;思路分析&#xff1a;&#x1f496;代码产出&#xff1a; &#x1f626;第二题&#…

优雅的设计测试用例

⭐️前言⭐️ 入职以后接触到了公司的具体业务&#xff0c;提升了设计测试用例的能力&#xff0c;于是沉淀出这篇文档与大家分享。 &#x1f349;欢迎点赞 &#x1f44d; 收藏 ⭐留言评论 &#x1f4dd;私信必回哟&#x1f601; &#x1f349;博主将持续更新学习记录收获&…

【C语言】表达式求值相关问题汇总—>隐式类型转换(整型提升)、算数转换与操作符优先级汇总(收藏查阅)

&#x1f440;樊梓慕&#xff1a;个人主页 &#x1f3a5;个人专栏&#xff1a;《C语言》《数据结构》《蓝桥杯试题》 &#x1f31d;每一个不曾起舞的日子&#xff0c;都是对生命的辜负。 目录 前言&#xff1a; 一、隐式类型转换 &#xff08;一&#xff09;整型提升的意义…

后端(四):博客系统项目

咱们在这里实现的是后端项目&#xff0c;前端代码就提一提&#xff0c;不全做重点介绍&#xff0c;在开始讲解这个博客系统项目之前&#xff0c;我们先看看这个项目的前端界面&#xff1a; 登录界面&#xff1a; 个人主页&#xff1a; 博客详情页&#xff1a; 写博客页&#x…

再见 Spring Boot 1.X ,Spring Boot 2.X 走向舞台中心

2019年8月6日&#xff0c;Spring 官方在其博客宣布&#xff0c;Spring Boot 1.x 停止维护&#xff0c;Spring Boot 1.x 生命周期正式结束。 其实早在2018年7月30号&#xff0c;Spring 官方就已经在博客进行过预告&#xff0c;Spring Boot 1.X 将维护到2019年8月1日。 1.5.x 将会…

【Java】重写compareTo()方法给对象数组排序

我们先给一个数组排序&#xff0c;我们肯定用的是Arrays.sort()方法&#xff1a; public class test2 {public static void main(String[] args) {int[] arr{3,5,4,6,9,8,1};System.out.println(Arrays.toString(arr));System.out.println("---------");Arrays.sort…

【C语言初阶】指针的运算or数组与指针的关系你了解吗?

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏:《快速入门C语言》《C语言初阶篇》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 文章目录 &#x1f4cb; 前言&#x1f4ac; 指针运算&#x1f4ad; 指针-整数&#x1f4ad; 指针-指针&#x1f4ad; 指针…

【Java基础教程】(四十二)多线程篇 · 上:多进程与多线程、并发与并行的关系,多线程的实现方式、线程流转状态、常用操作方法解析~

Java基础教程之多线程 上 &#x1f539;本节学习目标1️⃣ 线程与进程&#x1f50d;关于多进程、多线程、并发与并行之间的概念关系&#xff1f; 2️⃣ 多线程实现2.1 继承 Thread 类2.2 实现 Runnable 接口2.3 多线程两种实现方式的区别2.4 利用 Callable 接口实现多线程2.5 …

数学建模学习(4):TOPSIS 综合评价模型及编程实战

一、数据总览 需求&#xff1a;我们需要对各个银行进行评价&#xff0c;A-G为银行的各个指标&#xff0c;下面是银行的数据&#xff1a; 二、代码逐行实现 清空代码和变量的指令 clear;clc; 层次分析法 每一行代表一个对象的指标评分 p [8,7,6,8;7,8,8,7];%每一行代表一个…

为Android构建现代应用——设计原则

为Android构建现代应用——设计原则 - 掘金 state”是声明性观点的核心 在通过Compose或SwiftUI等框架设计声明性视图时&#xff0c;我们必须明确的第一个范式是State(状态)。UI组件结合了它的图形表示(View)和它的State(状态)。UI组件中发生变化的任何属性或数据都可以…

RuoYi-VUE : make sure to provide the “name“ option

前言 略 错误 错误原因 theme-picker 组件未被注册。 解决 src/App.vue代码恢复成若依的代码即可。&#xff08;PS&#xff1a;不知道代码被谁修改了&#xff09; 缺少这一段&#xff1a; <script> import ThemePicker from "/components/ThemePicker";…

hive基础

目录 DDL&#xff08;data definition language&#xff09; 创建数据库 创建表 hive中数据类型 create table as select建表 create table like语法 修改表名 修改列 更新列 替换列 清空表 关系运算符 聚合函数 字符串函数 substring:截取字符串 replace :替换…