【极数系列】Flink项目入门搭建(03)

【极数系列】Flink项目入门搭建(03)

引言

gitee地址:https://gitee.com/shawsongyue/aurora.git
源码直接下载可运行,模块:aurora_flink
Flink 版本:1.18.0
Jdk 版本:11

1.创建mavenx项目

在这里插入图片描述

2.包结构

在这里插入图片描述

3.引入pom依赖

tips:transformer处写主启动类

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xsy</groupId>
    <artifactId>aurora_flink</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!--属性设置-->
    <properties>
        <!--java_JDK版本-->
        <java.version>11</java.version>
        <!--maven打包插件-->
        <maven.plugin.version>3.8.1</maven.plugin.version>
        <!--编译编码UTF-8-->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!--输出报告编码UTF-8-->
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <!--json数据格式处理工具-->
        <fastjson.version>1.2.75</fastjson.version>
        <!--log4j版本-->
        <log4j.version>2.17.1</log4j.version>
        <!--flink版本-->
        <flink.version>1.18.0</flink.version>
        <!--scala版本-->
        <scala.binary.version>2.11</scala.binary.version>
        <!--log4j依赖-->
        <log4j.version>2.17.1</log4j.version>
    </properties>

    <!--通用依赖-->
    <dependencies>

        <!-- json -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <!--================================集成外部依赖==========================================-->
        <!--集成日志框架 start-->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <!--集成日志框架 end-->
    </dependencies>

    <!--编译打包-->
    <build>
        <finalName>${project.name}</finalName>
        <!--资源文件打包-->
        <resources>
            <resource>
                <directory>src/main/resources</directory>
            </resource>
            <resource>
                <directory>src/main/java</directory>
                <includes>
                    <include>**/*.xml</include>
                </includes>
            </resource>
        </resources>

        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>org.google.code.flindbugs:jar305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <excluder>org.apache.logging.log4j:*</excluder>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.xsy.sevenhee.flink.TestStreamJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <!--插件统一管理-->
        <pluginManagement>
            <plugins>
                <!--maven打包插件-->
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring.boot.version}</version>
                    <configuration>
                        <fork>true</fork>
                        <finalName>${project.build.finalName}</finalName>
                    </configuration>
                    <executions>
                        <execution>
                            <goals>
                                <goal>repackage</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>

                <!--编译打包插件-->
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>${maven.plugin.version}</version>
                    <configuration>
                        <source>${java.version}</source>
                        <target>${java.version}</target>
                        <encoding>UTF-8</encoding>
                        <compilerArgs>
                            <arg>-parameters</arg>
                        </compilerArgs>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <!--配置Maven项目中需要使用的远程仓库-->
    <repositories>
        <repository>
            <id>aliyun-repos</id>
            <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

    <!--用来配置maven插件的远程仓库-->
    <pluginRepositories>
        <pluginRepository>
            <id>aliyun-plugin</id>
            <url>https://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>

</project>

4.增加log4j2.properties配置

tips:resource目录下增加该配置,主要用于日志打印

rootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmp

5.创建主启动类

tips:编写了一个简单的有界数据流处理demo程序

  • step1:创建flink程序运行所需环境
  • step2:创建数据集
  • step3:把有限数据集转换为数据源
  • step4:简单通过flatmap处理数据
  • step5:输出最终结果
  • step6:启动任务
package com.aurora;


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Random;
import java.util.UUID;

/**
 * @author 浅夏的猫
 * @description 主启动类
 * @date 22:46 2024/1/13
 */
public class Application {

    private static final Logger logger = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) throws Exception {

        //1.创建flink程序运行所需环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.创建数据集
        ArrayList<String> list = new ArrayList<>();
        list.add("001");
        list.add("002");
        list.add("003");

        //3.把有限数据集转换为数据源
        DataStreamSource<String> dataStreamSource = env.fromCollection(list).setParallelism(1);

        //4.简单通过flatmap处理数据,
        SingleOutputStreamOperator<String> flatMap = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String record, Collector<String> collector) throws Exception {
                //数据追加随机数
                String uuidRecord=record+ UUID.randomUUID().toString();
                //当前环节处理完需要传递数据给下个环节
                collector.collect(uuidRecord);
            }
        });

        //5.输出最终结果
        flatMap.addSink(new SinkFunction<String>() {
            @Override
            public void invoke(String value) throws Exception {
                logger.info("当前正在处理的数据:{}",value);
            }
        }).setParallelism(1);

        //6.启动任务
        env.execute();
    }
}

6.构建打jar包

在这里插入图片描述

7.flinkUI页面部署

1.点击add new上传对应的应用包

2.主类填写com.aurora.Application

3.检查任务running状态,大概几秒钟跑完
在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/343577.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于taro搭建小程序多项目框架

前言 为什么需要这样一个框架&#xff0c;以及这个框架带来的好处是什么&#xff1f; 从字面意思上理解&#xff1a;该框架可以用来同时管理多个小程序&#xff0c;并且可以抽离公用组件或业务逻辑供各个小程序使用。当你工作中面临这种同时维护多个小程序的业务场景时&#xf…

Unity中UGUI在Mask剪裁粒子特效的实现

在Unity使用Mask是剪裁不了粒子特效的&#xff0c;之前有想过RenderTexture来实现&#xff0c;不过使用RenderTexture不适合用于很多个特效&#xff0c;因为RenderTexture依赖Camera的照射&#xff0c;如果在背包中每种道具都有不同的特效&#xff0c;那使用RenderTexture则需要…

对接钉钉机器人发送钉钉通知

实现效果 话不多说 直接上代码 static void sendMsg(String msg) {new Thread(()->{try {String content "{\"msgtype\": \"text\",\"text\": {\"content\": \"" msg "\"}}";HttpUtil.simplePos…

Unity 桥接模式(实例详解)

文章目录 示例1&#xff1a;角色与装备系统示例2&#xff1a;UI控件库示例3&#xff1a;渲染引擎模块示例4&#xff1a;AI决策树算法示例5&#xff1a;物理模拟引擎 在Unity游戏开发中&#xff0c;桥接模式&#xff08;Bridge Pattern&#xff09;是一种设计模式&#xff0c;它…

kafka(一)快速入门

一、kafka&#xff08;一&#xff09;是什么&#xff1f; kafka是一个分布式、支持分区、多副本&#xff0c;基于zookeeper协调的分布式消息系统&#xff1b; 二、应用场景 日志收集&#xff1a;一个公司可以用Kafka收集各种服务的log&#xff0c;通过kafka推送到各种存储系统…

php基础学习之整型进制

不同进制的整型数据定义 在 PHP中提供了四种整型的定义方式&#xff1a;十进制定义&#xff0c;二进制定义&#xff0c;八进制定义和十六进制。 定义格式如下&#xff1a; 十进制是最基础的&#xff1a;$a 110;二进制需要在值前面加上0b&#xff1a;$a 0B1101110;&#xf…

Java线程池,看这一篇足够

目录一览 Java线程池1. Executors提供6个线程池快捷创建方式2. ThreadPoolExecutor的7大参数3. 自定义线程池 Java线程池 上一篇《Async注解的注意事项》说到Async注解要配合自定义线程池一起使用&#xff0c;这一节说下Java的线程池。 1. Executors提供6个线程池快捷创建方式…

第八篇 交叉编译华为云Iot SDK到Orangepi3B

本篇主要内容&#xff1a; 一、交叉编译华为云Iot SDK依赖1.宿主机安装交叉编译工具链&#xff08;1&#xff09;选择下载交叉编译工具链&#xff08;2&#xff09;解压、添加环境变量、重启2.交叉编译依赖库&#xff08;0&#xff09; 准备工作&#xff08;1&#xff09; 交叉…

MySQL>基础sql语句

阅读目录 1.进入数据库2.数据库操作&#xff08;增删改查用&#xff09;3.表操作(增删改查)4.语句操作(增删改查) 回到顶部 1.进入数据库 打开终端,输入&#xff1a; /usr/local/mysql/bin/mysql -uroot -p回车 输入密码&#xff1a; 回到顶部 2.数据库操作&#xff08;增…

RabbitMQ环境配置

文章目录 安装Erlang安装RabbitMQ 安装Erlang 下载地址&#xff1a;http://erlang.org/download/otp_win64_25.3.2.7.exe 安装RabbitMQ 下载地址&#xff1a;https://www.rabbitmq.com/install-windows.html 进入RabbitMQ安装目录下的sbin目录 输入以下命令启动管理功能 …

Java 设计者模式以及与Spring关系(七) 命令和迭代器模式

简介: 本文是个系列一次会出两个设计者模式作用&#xff0c;如果有关联就三个&#xff0c;除此外还会讲解在spring中作用。 23设计者模式以及重点模式 我们都知道设计者模式有3类23种设计模式&#xff0c;标红是特别重要的设计者模式建议都会&#xff0c;而且熟读于心&#…

Django开发_17_表单类

一、介绍 为了简化前端form表单代码 二、步骤 &#xff08;一&#xff09;创建form.py 创建一个表单类 from django import formsclass RegisterForm(forms.Form):reg_name forms.CharField(max_length10, label用户名)reg_pwd forms.CharField(max_length20, label密码…

python Seq2Seq模型源码实战,超详细Encoder-Decoder模型解析实战;早期机器翻译模型源码demo

1.Seq2Seq&#xff08;Encoder-Decoder&#xff09;模型简介 Seq2Seq&#xff08;Encoder-Decoder&#xff09;模型是一种常用于序列到序列&#xff08;sequence-to-sequence&#xff09;任务的深度学习模型。它由两个主要的组件组成&#xff1a;编码器&#xff08;Encoder&am…

顶顶通呼叫中心中间件机器人压力测试配置(mod_cti基于FreeSWITCH)

介绍 顶顶通呼叫中心中间件机器人压力测试(mod_cit基于FreeSWITCH) 一、配置acl.conf 打开ccadmin-》点击配置文件-》点击acl.conf-》我这里是已经配置好了的&#xff0c;这里的192.168.31.145是我自己的内网IP&#xff0c;你们还需要自行修改 二、配置线路 打开ccadmin-&g…

社区分享|百果园选择DataEase搭档蜜蜂微搭实现企业数据应用一体化

百果园&#xff0c;全称为深圳百果园实业&#xff08;集团&#xff09;股份有限公司&#xff0c;2001年12月成立于深圳&#xff0c;2002年开出中国第一家水果专卖店。截至2022年11月&#xff0c;百果园全国门店数量超过5600家&#xff0c;遍布全国140多个城市&#xff0c;消费会…

实现单链表的增删改查

实现单链表的增删改查的功能&#xff1a;头部插入删除/尾部插入删除&#xff0c;查找&#xff0c;在指定位置之前插入数据&#xff0c;删除pos节点&#xff0c;在指定位置之后插入数据&#xff0c;删除pos之后的节点&#xff0c;销毁链表。 SListNode.h #pragma once #includ…

工程化代码管理高频面试题

1. git常用命令以及工作中都怎么工作 git init 初始化仓库 ​ git status 查看当前各个区域的代码状态。 ​ git log查看commit记录 ​ git reflog查看完整记录 ​ git add 添加工作区代码到暂存区 ​ Git commit 暂存区代码的提交 ​ git reset 代码的版本回退 ​ git stash …

Python中的open与JSON的使用

目录 1 使用 open 函数进行文件操作 2 使用 json 模块进行 JSON 数据处理&#xff1a; 2.1 写入JSON 文件 2.2 读取JSON 文件 在 Python 中&#xff0c;open 函数和 json 模块常用于文件的读写和 JSON 数据的处理。 1 使用 open 函数进行文件操作 open 函数用于打开文件…

docker安装Rabbitmq教程(详细图文)

目录 1.下载Rabbitmq的镜像 2.创建并运行rabbitmq容器 3.启动web客户端 4.访问rabbitmq的微博客户端 5.遇到的问题 问题描述&#xff1a;在rabbitmq的web客户端发现界面会弹出如下提示框Stats in management UI are disabled on this node 解决方法 &#xff08;1&#…

【JAVA语言-第14话】集合框架(一)——Collection集合,迭代器,增强for,泛型

目录 集合框架 1.1 概述 1.2 集合和数组的区别 1.3 Collection集合 1.3.1 概述 1.3.2 常用方法 1.4 迭代器 1.4.1 概述 1.4.2 常用方法 1.4.3 使用步骤 1.5 增强for循环 1.5.1 概述 1.5.2 使用 1.6 泛型 1.6.1 概述 1.6.2 使用泛型的利弊 1.6.2.1 好处 1…