Flink定制化功能开发,demo代码

前言:

       这是一个Flink自定义开发的基础教学。本文将通过flink的DataStream模块API,以kafka为数据源,构建一个基础测试环境;包含一个kafka生产者线程工具,一个自定义FilterFunction算子,一个自定义MapFunction算子,用一个flink任务的代码逻辑,将实时读kafka并多层处理串起来;让读者体会通过Flink构建自定义函数的技巧。

一、Flink的开发模块分析

Flink提供四个基础模块:核心SDK开发API分别是处理实时计算的DataStream和处理离线计算的DataSet;基于这两个SDK,在其上包装了TableAPI开发模块的SDK;在Table API之上,定义了高度抽象可用SQL开发任务的FlinkSQL。在核心开发API之下,还有基础API的接口,可用于对时间,状态,算子等最细粒度的特性对象做操作,如包装自定义算子的ProcessWindowFunction和ProcessFunction等基础函数以及内置的对象状态StateTtlConfig;

FLINK开发API关系结构如下:

二、定制化开发Demo演示

2.1 场景介绍

Flink实时任务的的通用技术架构是消息队列中间件+Flink任务:

将数据采集到Kafka或pulser这类队列中间件的Topic,然后使用Flink内置的kafkaSource,监控Topic的数据情况,做实时处理。

  1. 这里提供一个kafka的生产者线程,可以自定义构建需要的数据和上传时间,用于控制写入kafka的数据源;
  2. 重写两个DataStream的基础算子:FilterFunction和MapFunction,用于让读者体会,如何对FLINK函数的重新包装,后续更基础的函数原理一样;我这里用String数据对象做处理,减少对象转换的SDK引入,通常要基于业务做数据polo的加工,这个自己处理,将对象换成业务对象;
  3. 然后使用Flink将整个业务串起来,从kafka读数据,经过两层处理,最终输出需要的结果;

2.2 本地demo演示

2.2.1 pom文件

这里以flink1.14.6+scala1.12版本为例:

2.2.2 kafka生产者线程方法

package org.example.util;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.*;

/**
 * 向kafka生产数据
 *
 * @author i7杨
 * @date 2024/01/12 13:02:29
 */

public class KafkaProducerUtil extends Thread {

    private String topic;

    public KafkaProducerUtil(String topic) {
        super();
        this.topic = topic;
    }

    private static Producer<String, String> createProducer() {
        // 通过Properties类设置Producer的属性
        Properties properties = new Properties();
//        测试环境 kafka 配置
        properties.put("bootstrap.servers", "ip2:9092,ip:9092,ip3:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return new KafkaProducer<String, String>(properties);
    }

    @Override
    public void run() {
        Producer<String, String> producer = createProducer();
        Random random = new Random();
        Random random2 = new Random();

        while (true) {
            int nums = random.nextInt(10);
            int nums2 = random.nextInt(50);
//            double nums2 = random2.nextDouble();

            String time = new Date().getTime() / 1000 + 5 + "";
            String type = "pv";
            try {
                if (nums2 % 2 == 0) {
                    type = "pv";
                } else {
                    type = "uv";

                }
//                String info = "{\"user\":" + nums + ",\"item\":" + nums * 10 + ",\"category\":" + nums2 + ",\"pv\":" + nums2 * 5 + ",\"ts\":\"" + time + "\"}";
                String info = nums + "=" + nums2;

                System.out.println("message : " + info);
                producer.send(new ProducerRecord<String, String>(this.topic, info));
                
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("=========数据已经写入==========");
            
            try {
                sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    public static void main(String[] args) {
        new KafkaProducerUtil("test01").run();
    }
    
    public static void sendMessage(String topic, String message) {
        Producer<String, String> producer = createProducer();
        producer.send(new ProducerRecord<String, String>(topic, message));
    }
    
}
2.2.3 自定义基础函数

这里自定义了filter和map两个算子函数,测试逻辑按照数据结构变化:

自定义FilterFunction函数算子:阈值小于40的过滤掉

package org.example.funtion;

import org.apache.flink.api.common.functions.FilterFunction;

/**
 * FilterFunction重构
 *
 * @author i7杨
 * @date 2024/01/12 13:02:29
 */

public class InfoFilterFunction implements FilterFunction<String> {

    private double threshold;

    public InfoFilterFunction(double threshold) {
        this.threshold = threshold;
    }

    @Override
    public boolean filter(String value) throws Exception {

        if (value.split("=").length == 2)
            // 阈值过滤
            return Double.valueOf(value.split("=")[1]) > threshold;
        else return false;
    }
}

自定义MapFunction函数:后缀为2的,添加上特殊信息

package org.example.funtion;

import org.apache.flink.api.common.functions.MapFunction;

public class ActionMapFunction implements MapFunction<String, String> {

    @Override
    public String map(String value) throws Exception {
        System.out.println("value:" + value);
        if (value.endsWith("2"))
            return value.concat(":Special processing information");
        else return value;
    }
}

2.2.4 flink任务代码

任务逻辑:使用kafka工具产生数据,然后监控kafka的topic,讲几个函数串起来,输出结果;

package org.example.service;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.example.funtion.ActionMapFunction;
import org.example.funtion.InfoFilterFunction;

import java.util.*;

public class FlinkTestDemo {
    public static void main(String[] args) throws Exception {

        // 设置执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Kafka 配置
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip1:9092,ip2:9092,ip3:9092");
        kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group");
        kafkaProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        kafkaProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // 创建 Kafka 消费者
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                "test01",// Kafka 主题名称
                new SimpleStringSchema(),
                kafkaProps);

        // 从 Kafka 中读取数据流
        DataStream<String> kafkaStream = env.addSource(kafkaConsumer);
        env.disableOperatorChaining();

        kafkaStream
                .filter(new InfoFilterFunction(40))
                .map(new ActionMapFunction())
                .print("阈值大于40以上的message=");

        // 执行任务
        env.execute("This is a testing task");
    }


}

运行结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/315930.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Spring Boot - Application Events 的发布顺序_ApplicationReadyEvent

文章目录 Pre概述Code源码分析 Pre Spring Boot - Application Events 的发布顺序_ApplicationEnvironmentPreparedEvent 概述 Spring Boot 的广播机制是基于观察者模式实现的&#xff0c;它允许在 Spring 应用程序中发布和监听事件。这种机制的主要目的是为了实现解耦&#…

每日一练:LeeCode-111. 二叉树的最小深度【二叉树】

本文是力扣LeeCode-111. 二叉树的最小深度 学习与理解过程&#xff0c;本文仅做学习之用&#xff0c;对本题感兴趣的小伙伴可以出门左拐LeeCode。 给定一个二叉树&#xff0c;找出其最小深度。 最小深度是从根节点到最近叶子节点的最短路径上的节点数量。 说明&#xff1a;叶子…

Java异常处理--异常处理的方式1:try-catch-finally

文章目录 一、异常处理概述二、方式1&#xff1a;捕获异常&#xff08;try-catch-finally&#xff09;&#xff08;1&#xff09;抓抛模型&#xff08;2&#xff09;try-catch-finally基本格式1、基本语法2、整体执行过程3、try和catch3.1 try3.2 catch (Exceptiontype e) &…

掌握 gRPC 和 RPC 的关键区别

一、远程过程调用协议简介 1、RPC 的本质 首先&#xff0c;我们探讨一下什么是 RPC。RPC&#xff0c;缩写为 Remote Procedure Call Protocol&#xff0c;直译来看就是远程过程调用协议。 讲得通俗一些&#xff1a; RPC 是一种通信机制RPC 实现了客户端/服务器通信模型 官…

【数据集处理】FFHQ如何进行人脸对齐,Aligned and cropped images at 1024×1024

什么是人脸对齐&#xff1f; 人脸对齐是一种图像处理技术&#xff0c;旨在将图像中的人脸部分对齐到一个标准位置或形状。在许多情况下&#xff0c;这通常涉及将眼睛、鼻子和嘴巴等关键点对齐到特定的位置。通过这种方式&#xff0c;所有的人脸图像可以有一个一致的方向和尺寸…

【JVM的相关参数和调优】

文章目录 JVM 调优的参数类型一、标配参数二、X参数三、XX参数 JVM 调优的常用参数 JVM 调优的参数类型 一、标配参数 这类此参数在jdk的各个版本之间很少会变化&#xff0c;基本不改变 java -version&#xff0c;查看当前电脑上的jdk的版本信息 java -help&#xff0c;查看…

阴盘奇门八字排盘马星位置计算方法php代码

如下位置&#xff0c;马星的四个位置。 计算方法&#xff1a; 1。先根据出生年月日&#xff0c;计算得八字四柱。比如 2024年01月09日&#xff0c;四柱为 其中时柱地支为“申” 2。然后根据以下对应的数组&#xff0c;来找到id号&#xff0c;即马星位置。 根据下表来找到&am…

开机自启动android app

Android App开机自启动_android 开机自启动-CSDN博客 注意权限问题&#xff1a; 第二种实现方式&#xff1a;系统桌面应用 问&#xff1a;android的系统桌面应用启动是什么&#xff1a; 答&#xff1a; Android 系统桌面应用是指用户在设备主屏幕上看到的默认启动界面&…

What does `HandlerInterceptor` do?

HandlerInterceptor 是 SpringMVC 中的一个接口&#xff0c;在SpringMVC应用中它提供了一种实现应用级拦截器的机制。 第1步&#xff1a;引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web<…

利用 Azure Data Bricks的免费资源学习云上大数据

在这个数据驱动的时代&#xff0c;大数据和云计算已成为推动技术创新和商业智能的关键因素。Azure Databricks&#xff0c;作为一个先进的云平台&#xff0c;为那些渴望深入了解和掌握这些技术的人们提供了一个理想的学习环境。我们这里将利用 Azure Databricks 的免费资源&…

C语言进阶指南(22)——文件管理函数

欢迎来到博主的专栏——C语言进阶指南 博主id&#xff1a;代码小豪 文章目录 一、文件输入输出函数fwritefread 二、文件定位函数文件位置fseekftellrewind 三、文件缓冲区fflush 一、文件输入输出函数 这些函数用于文件流&#xff0c;主要功能是将一连串的数据输出或输入&am…

python24.1.13for循环

对列表、字典、字符串等进行迭代 range

关系型数据库和MySQL概述

关系型数据库概述 数据持久化 - 将数据保存到能够长久保存数据的存储介质中,在掉电的情况下数据也不会丢失。数据库发展史 - 网状数据库、层次数据库、关系数据库、NoSQL 数据库、NewSQL 数据库。1970年,IBM的研究员E.F.Codd在_Communication of the ACM_上发表了名为_A Rela…

可盐可甜的红色马甲背心

膨体棉腈面料不易皱&#xff0c;搭配阿兰花菱形镂空设计 真的绝绝子&#xff0c;红色吸睛又美观 随便搭配一件衬衫去穿&#xff0c;自带文艺气息 氛围感直接拉满 出街拍照很出片&#xff0c;时髦又气质 女孩子的甜美&#xff0c;温柔等都可以突显 有喜欢的可以尝试一下哟…

Java课程设计团队博客 —— 基于网页的时间管理系统

博客目录 1.项目简介2.项目采用的技术3.功能需求分析4.项目亮点5.主要功能截图6.Git地址7.总结 Java团队博客分工 姓名职务负责模块个人博客孙岚组长 资源文件路径和tomcat服务器的相关配置。 前端的页面设计与逻辑实现的代码编写。 Servlet前后端数据交互的编写。 用户登录和…

数据结构实战:变位词侦测

文章目录 一、实战概述二、实战步骤&#xff08;一&#xff09;逐个比较法1、编写源程序2、代码解释说明&#xff08;1&#xff09;函数逻辑解释&#xff08;2&#xff09;主程序部分 3、运行程序&#xff0c;查看结果4、计算时间复杂度 &#xff08;二&#xff09;排序比较法1…

windows server 2012、2019服务器定时重启

手动设置定时任务 1.开始菜单&#xff0c;找到“计划任务程序”; 如果无法创建基本任务的话&#xff0c;可能是系统中的“Task Scheduler”服务没有启动&#xff0c;你可在运行中键入“ services.msc”&#xff0c;查看“Task Scheduler”服务是否被设置成了“已禁用”&#x…

一个个人博客应该怎么学?

一个个人博客应该怎么学&#xff1f; 好多零基础的同学们不知道怎么迈出第一步。 那么&#xff0c;就找一个现成的模板学一学呗&#xff0c;毕竟我们是高贵的Ctrl c v 工程师。 但是这样也有个问题&#xff0c;那就是&#xff0c;那些模板都&#xff0c;太&#xff01;复&…

哪个牌子的护眼台灯适合学生?2024护眼台灯推荐

不知道各位父母对孩子的视力健康有没有关注&#xff0c;我国儿童青少年的近视率高达52.7%&#xff0c;也就是说&#xff0c;平均是个儿童中就有五个儿童存在视力问题&#xff0c;而且近视发生年龄提前至3到7岁。作为一名眼部护理博主&#xff0c;孩子从小看书、看屏幕起&#x…

10分钟快速搭建个人博客、文档网站!

本文来分享 8 个现代化前端工具&#xff0c;帮你快速生成个人博客、文档网站&#xff01; VitePress VitePress 是一款静态站点生成器&#xff0c;专为构建快速、以内容为中心的网站而设计。简而言之&#xff0c;VitePress 获取用 Markdown 编写的源内容&#xff0c;为其应用…