实战Flink Java api消费kafka实时数据落盘HDFS

文章目录

  • 1 需求分析
  • 2 实验过程
    • 2.1 启动服务程序
    • 2.2 启动kafka生产
  • 3 Java API 开发
    • 3.1 依赖
    • 3.2 代码部分
  • 4 实验验证
    • STEP1
    • STEP2
    • STEP3
  • 5 时间窗口

1 需求分析

在Java api中,使用flink本地模式,消费kafka主题,并直接将数据存入hdfs中。

flink版本1.13

kafka版本0.8

hadoop版本3.1.4

2 实验过程

2.1 启动服务程序

为了完成 Flink 从 Kafka 消费数据并实时写入 HDFS 的需求,通常需要启动以下组件:

[root@hadoop10 ~]# jps
3073 SecondaryNameNode
2851 DataNode
2708 NameNode
12854 Jps
1975 StandaloneSessionClusterEntrypoint
2391 QuorumPeerMain
2265 TaskManagerRunner
9882 ConsoleProducer
9035 Kafka
3517 NodeManager
3375 ResourceManager

确保 Zookeeper 在运行,因为 Flink 的 Kafka Consumer 需要依赖 Zookeeper。

确保 Kafka Server 在运行,因为 Flink 的 Kafka Consumer 需要连接到 Kafka Broker。

启动 Flink 的 JobManager 和 TaskManager,这是执行 Flink 任务的核心组件。

确保这些组件都在运行,以便 Flink 作业能够正常消费 Kafka 中的数据并将其写入 HDFS。

  • 具体的启动命令在此不再赘述。

2.2 启动kafka生产

  • 当前kafka没有在守护进程后台运行;
  • 创建主题,启动该主题的生产者,在kafka的bin目录下执行;
  • 此时可以生产数据,从该窗口键入任意数据进行发送。
kafka-topics.sh --zookeeper hadoop10:2181 --create --topic topic1 --partitions 1 --replication-factor 1

kafka-console-producer.sh --broker-list hadoop10:9092 --topic topic1

在这里插入图片描述

3 Java API 开发

3.1 依赖

此为项目的所有依赖,包括flink、spark、hbase、ck等,实际本需求无需全部依赖,均可在阿里云或者maven开源镜像站下载。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <flink.version>1.13.6</flink.version>
        <hbase.version>2.4.0</hbase.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
           <!-- <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>1.14.6</version>
        </dependency>-->

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.7.5-10.0</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>

       <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>guava</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>log4j</artifactId>
                    <groupId>log4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>guava</artifactId>
                    <groupId>com.google.guava</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.4.2</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.32</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hbase-2.2_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.20</version>
        </dependency>
    </dependencies>

    <build>
        <extensions>
            <extension>
                <groupId>org.apache.maven.wagon</groupId>
                <artifactId>wagon-ssh</artifactId>
                <version>2.8</version>
            </extension>
        </extensions>

        <plugins>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>wagon-maven-plugin</artifactId>
                <version>1.0</version>
                <configuration>
                    <!--上传的本地jar的位置-->
                    <fromFile>target/${project.build.finalName}.jar</fromFile>
                    <!--远程拷贝的地址-->
                    <url>scp://root:root@hadoop10:/opt/app</url>
                </configuration>
            </plugin>
        </plugins>

    </build>



</project>

  • 依赖参考
    在这里插入图片描述

3.2 代码部分

  • 请注意kafka和hdfs的部分需要配置服务器地址,域名映射。
  • 此代码的功能是消费topic1主题,将数据直接写入hdfs中。
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class Test9_kafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop10:9092");
        properties.setProperty("group.id", "test");

        // 使用FlinkKafkaConsumer作为数据源
        DataStream<String> ds1 = env.addSource(new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties));

        String outputPath = "hdfs://hadoop10:8020/out240102";

        // 使用StreamingFileSink将数据写入HDFS
        StreamingFileSink<String> sink = StreamingFileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .build();

        // 添加Sink,将Kafka数据直接写入HDFS
        ds1.addSink(sink);
        ds1.print();
        env.execute("Flink Kafka HDFS");
    }
}

4 实验验证

STEP1

运行idea代码,程序开始执行,控制台除了日志外为空。下图是已经接收到生产者的数据后,消费在控制台的截图。

在这里插入图片描述

STEP2

启动生产者,将数据写入,数据无格式限制,随意填写。此时发送的数据,是可以在STEP1中的控制台中看到屏幕打印结果的。
在这里插入图片描述

STEP3

在HDFS中查看对应的目录,可以看到数据已经写入完成。
我这里生成了多个inprogress文件,是因为我测试了多次,断码运行了多次。ide打印在屏幕后,到hdfs落盘写入,中间有一定时间,需要等待,在HDFS中刷新数据,可以看到文件大小从0到被写入数据的过程。
在这里插入图片描述

5 时间窗口

  • 使用另一种思路实现,以时间窗口的形式,将数据实时写入HDFS,实验方法同上。截图为发送数据消费,并且在HDFS中查看到数据。
    在这里插入图片描述

在这里插入图片描述

package day2;

import day2.CustomProcessFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class Test9_kafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop10:9092");
        properties.setProperty("group.id", "test");

        // 使用FlinkKafkaConsumer作为数据源
        DataStream<String> ds1 = env.addSource(new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties));

        String outputPath = "hdfs://hadoop10:8020/out240102";

        // 使用StreamingFileSink将数据写入HDFS
        StreamingFileSink<String> sink = StreamingFileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .build();

        // 在一个时间窗口内将数据写入HDFS
        ds1.process(new CustomProcessFunction())  // 使用自定义 ProcessFunction
                .addSink(sink);

        // 执行程序
        env.execute("Flink Kafka HDFS");
    }
}
package day2;

import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class CustomProcessFunction extends ProcessFunction<String, String> {
    @Override
    public void processElement(String value, Context ctx, Collector<String> out) throws Exception {
        // 在这里可以添加具体的逻辑,例如将数据写入HDFS
        System.out.println(value);  // 打印结果到屏幕
        out.collect(value);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/300713.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

VQ-VAE(Neural Discrete Representation Learning)论文解读及实现

pytorch 实现git地址 论文地址&#xff1a;Neural Discrete Representation Learning 1 论文核心知识点 encoder 将图片通过encoder得到图片点表征 如输入shape [32,3,32,32] 通过encoder后输出 [32,64,8,8] (其中64位输出维度) 量化码本 先随机构建一个码本&#xff0c;维度…

三大主要排序方法总结:快速排序,选择排序,冒泡排序

本文介绍&#xff1a;三大排序方法&#xff08;快速排序&#xff0c;选择排序&#xff0c;冒泡排序&#xff09;&#xff08;后续期间可能会发布一篇关于qsort函数的文章&#xff09; 自我介绍&#xff1a;一个脑子不好的大一学生&#xff0c;c语言接触还没到半年&#xff0c;…

Spring面试篇

Spring面试篇 前置知识ApplicationContextInitializerApplicationListenerBeanFactoryBeanDefinitionBeanFactoryPostProcesssorAwareInitialzingBean&#xff0c;DisposableBeanBeanPostProcessor SpringBoot启动流程IOC容器初始化流程Bean生命周期Bean循环依赖解决 SpringMvc…

Java-IO流-15

文件操作 文件创建 package com.edu.file;import org.junit.jupiter.api.Test;import java.io.File; import java.io.IOException;public class Demo01 {public static void main(String[] args) {}Test//方式1public void create01(){String filePath "D:\\new1.txt&q…

阿里云服务器地域如何选择?

阿里云服务器地域和可用区怎么选择&#xff1f;地域是指云服务器所在物理数据中心的位置&#xff0c;地域选择就近选择&#xff0c;访客距离地域所在城市越近网络延迟越低&#xff0c;速度就越快&#xff1b;可用区是指同一个地域下&#xff0c;网络和电力相互独立的区域&#…

JSP+Servlet 重要知识点 (含面试题)

JSP是Servlet技术的扩展&#xff0c;本质上就是Servlet的简易方式。JSP编译后是“类servlet”。 这里提一句&#xff1a; jsp已经没有深入学习的必要了&#xff0c;除了维护老项目能用上一些&#xff0c;基本属于被淘汰的边缘了。Servlet还是有必要学习一下&#xff0c;比如sp…

Java:Stream流

文章目录 1、体验Stream流2、Stream流的常见生成方式3、Stream流中间操作方法4、Stream流终结操作方法5、Stream流的收集操作6、Stream流综合练习6.1 练习16.2 练习26.3 练习3 以下代码使用JDK11编写。 1、体验Stream流 &#xff08;1&#xff09;案例需求 按照下面的要求完成…

如何使用css隐藏掉滚动条

1.解决方案 在滚动元素上再包裹一个父元素&#xff0c;然后&#xff0c;该元素添加如下代码&#xff1a; &#xff08;注&#xff1a;PC端浏览器滚动条为8px&#xff09;使元素偏移原来位置8px&#xff0c;目的就是将滚动条区域移动到父元素边框外面&#xff0c;然后&#xff…

AI教我学编程之AI自刀

AI教我学编程系列学习第二课 — C#变量类型 上节回顾知识梳理C#基本变量类型 对话AI分歧产生本段总结 它说得对吗&#xff1f;我随即发问经典AI自刀他来了 总结 上节回顾 在上一节中我们发现&#xff0c;AI工具似乎还不能达到教学的水平&#xff0c;所以在本节中&#xff0c;…

ORA-600 adg无法查询故障

再续前缘 ORA-600[12406]故障解决-CSDN博客 当你点背的时候&#xff0c;看似一个简单的case&#xff0c;总是会迎来反转 上次改完参数没两天&#xff0c;又出现了报错不同&#xff0c;但是现象相似的情况 这次是 ORA-600 [kksgaGetNoAlloc_Int0] 这次出现故障的范围更大&a…

【Spring Boot 源码学习】SpringApplication 的定制化介绍

Spring Boot 源码学习系列 SpringApplication 的定制化介绍 一、引言二、往期内容三、主要内容1. 基础配置1.1 设置关闭 Banner1.2 设置自定义 Banner 打印对象1.3 设置应用程序主入口类1.4 设置用于创建应用程序上下文的工厂1.5 添加 BootstrapRegistry 初始化器实现1.6 设置或…

Python学习之路——函数进阶

目录 一、函数的多返回值 &#xff08;一&#xff09;如何操作 &#xff08;二&#xff09;代码示例 二、函数的多种传参方式 &#xff08;一&#xff09;位置参数 &#xff08;二&#xff09;关键字参数 &#xff08;三&#xff09;缺省参数 1、定义 2、作用 3、代码示…

Spring之代理模式

1、概念 1.1 介绍 二十三种设计模式中的一种&#xff0c;属于结构型模式。它的作用就是通过提供一个代理类&#xff0c;让我们在调用目标方法的时候&#xff0c;不再是直接对目标方法进行调用&#xff0c;而是通过代理类间接调用。让不属于目标方法核心逻辑的代码从目标方法中…

互联网分布式应用之SpringCloud

SpringCloud Java 是第一大编程语言和开发平台。它有助于企业降低成本、缩短开发周期、推动创新以及改善应用服务。如今全球有数百万开发人员运行着超过 51 亿个 Java 虚拟机&#xff0c;Java 仍是企业和开发人员的首选开发平台。 课程内容的介绍 1. 微服务项目介绍 2. Eure…

C++ goto语句

作用&#xff1a;可以无条件跳转语句&#xff0c;类似计算机组成原理mips指令集中的jump直接跳转指令&#xff08;汇编语言&#xff09;。 语法&#xff1a;goto标记&#xff1b; 解释&#xff1a;如果标记的名称存在&#xff0c;执行到goto语句时&#xff0c;会跳转到标记的…

小游戏实战丨基于PyGame的贪吃蛇小游戏

文章目录 写在前面PyGame贪吃蛇注意事项系列文章写在后面 写在前面 本期内容&#xff1a;基于pygame的贪吃蛇小游戏 下载地址&#xff1a;https://download.csdn.net/download/m0_68111267/88700188 实验环境 python3.11及以上pycharmpygame 安装pygame的命令&#xff1a;…

python实现windows内存看门狗程序(带GUI界面)

python实现windows内存看门狗程序&#xff08;带GUI界面&#xff09; 效果图 1、程序核心 看门狗程序核心&#xff1a; 1、运行特定程序任务进程 2、监控任务管理器上的内存使用率 3、如果超过阈值则关闭该特定程序进程 4、重新开启该特定程序 5、重复过程2持续监控2、程序流…

Spring Boot 基础知识点1 (含面试题1)

Spring Boot 是一款基于 Spring 框架的开源应用程序开发工具&#xff0c;它旨在简化 Spring 应用程序的配置和开发过程。Spring Boot 提供了一种简单的方式来创建可独立运行的、生产级别的应用程序&#xff0c;并在需要时进行部署。Spring Boot 在微服务架构和云计算环境下得到…

thinkphp6实现简单定时任务

thinkphp6实现定时任务 创建定时任务文件定义指令编写Test.php代码运行测试 创建定时任务文件 Test类名根据自己的需要修改 php think make:command Test testcommand文件夹在app目录下没有需要自己创建 运行上面的命令后会在command下 多一个Test.php文件 定义指令 在conf…

PHP代码审计之实战审代码篇2

4. 仔细观察如下代码&#xff0c;思考代码有什么缺陷&#xff0c;可能由此引发什么样的问题&#xff1f; <?php require_once("/home/rconfig/classes/usersession.class.php"); require_once("/home/rconfig/classes/ADLog.class.php"); require_onc…