Flink中自定义Source和Sink的使用

只要自定一个Source类实现SourceFunction接口,一个Sink类实现SinkFunction接口,就能正常使用自定义的Source和Sink,或者直接extends继承RichSourceFunction和RichSinkFunction,RichSinkFunction:多个open和close方法

1、自定义Source

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class Demo3SourceFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //使用自定义source
        DataStream<Integer> myDS = env.addSource(new MySource());
        myDS.print();

        env.execute();
    }
}

//自定义source
//实现SourceFunction接口
class MySource implements SourceFunction<Integer> {
    //在run方法中读取外部的数据,使用原生java代码
    @Override
    public void run(SourceContext<Integer> ctx) throws Exception {
        while (true) {
            ctx.collect(1);
            Thread.sleep(1000);
        }
    }

    //cancel方法是任务被取消是执行的,用于回收资源
    @Override
    public void cancel() {
    }
}

2、自定义Sink

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

public class Demo2MySink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> linesDS = env.socketTextStream("master", 8888);
        linesDS.addSink(new MySink());

        env.execute();
    }
}

//自定义Sink
class MySink implements SinkFunction<String> {
    //每一条数据执行一次
    @Override
    public void invoke(String value, Context context) throws Exception {
        System.out.println("mySink:" + value);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/915646.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

指令存储和指令流水线

要求存储器的编址单位&#xff0c;首先观察到计算机采用的是32位定长指令字&#xff0c;因此一条指令就是32位&#xff0c;即4B&#xff0c;根据表中可知一条指令所占地址空间为08048104H-08048100H4H&#xff0c;因此所用的编制单位为字节&#xff08;B&#xff09; 将所有指令…

kafka管理工具

文章目录 前言一、Kafka Assistan1.1 描述1.2、配置安装 二、Conduktor2.1、描述2.2、配置安装 三、kafka-maneger3.1、描述3.2、配置安装3.3、命令启动3.4、[refer to](https://www.ctyun.cn/document/10000120/10033218#section-39755766f4910e4b) 前言 提示&#xff1a;这里…

JavaWeb常见注解

1.Controller 在 JavaWeb 开发中&#xff0c;Controller是 Spring 框架中的一个注解&#xff0c;主要用于定义控制器类&#xff08;Controller&#xff09;&#xff0c;是 Spring MVC 模式的核心组件之一。它表示该类是一个 Spring MVC 控制器&#xff0c;用来处理 HTTP 请求并…

axios平替!用浏览器自带的fetch处理AJAX(兼容表单/JSON/文件上传)

fetch 是啥&#xff1f; fetch 函数是 JavaScript 中用于发送网络请求的内置 API&#xff0c;可以替代传统的 XMLHttpRequest。它可以发送 HTTP 请求&#xff08;如 GET、POST 等&#xff09;&#xff0c;并返回一个 Promise&#xff0c;从而简化异步操作 基本用法 /* 下面是…

window任务计划记录中显示操作成功,但是代码只执行了第一句命令

一、创建定时任务 1. Windows键R 调出此窗口&#xff0c;输入compmgmt.msc &#xff08;调用的是计算机管理&#xff09; 2. 创建基本任务 在任务计划程序中右键 选择 创建基本任务。 输入任务名称及描述。 下一步中选择触发器的时间&#xff0c;这里选择每天。 选择开始时间&…

使用VSCode远程连接服务器并解决Neo4j无法登陆问题

摘要&#xff1a;本文介绍了如何通过VSCode连接内网部署的Neo4j服务器&#xff0c;并启动服务。在访问Neo4j登录界面时&#xff0c;遇到了端口映射问题导致无法登录。通过手动添加7687端口的映射后&#xff0c;成功登录Neo4j。 我在内网部署了一台服务器&#xff0c;并在其上运…

【异常解决】Linux shell报错:-bash: [: ==: 期待一元表达式 解决方法

博主介绍&#xff1a;✌全网粉丝21W&#xff0c;CSDN博客专家、Java领域优质创作者&#xff0c;掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域✌ 技术范围&#xff1a;SpringBoot、SpringCloud、Vue、SSM、HTML、Nodejs、Python、MySQL、PostgreSQL、大数据、物…

游戏引擎学习第四天

视频参考:https://www.bilibili.com/video/BV1aDmqYnEnc/ BitBlt 是 Windows GDI&#xff08;图形设备接口&#xff09;中的一个函数&#xff0c;用于在设备上下文&#xff08;device context, DC&#xff09;之间复制位图数据。BitBlt 的主要用途是将一个图像区域从一个地方复…

七牛云上传图片成功,但是无法访问显示{error : document not found}

上传图片成功&#xff0c;但是访问不了的问题&#xff0c;直接把地址放进浏览器显示{error : document not found}&#xff0c;直接访问 DCNF 404是符合预期的&#xff0c;因为还没有去空间复制外链&#xff0c;要访问实际存在的资源才可以的. 配置区域和访问域名 设置没问题了…

通过投毒Bingbot索引挖掘必应中的存储型XSS

简介 在本文中&#xff0c;我将讨论如何通过从外部网站对Bingbot进行投毒&#xff0c;来在Bing.com上实现持久性XSS攻击。 什么是存储型或持久性XSS&#xff1f;存储型攻击指的是将恶意脚本永久存储在目标服务器上&#xff0c;例如数据库、论坛、访问日志、评论栏等。受害者在…

84.7k Star!Excalidraw:开源的在线白板工具,具备手绘风格和实时协作功能

❤️ 如果你也关注大模型与 AI 的发展现状&#xff0c;且对大模型应用开发非常感兴趣&#xff0c;我会快速跟你分享最新的感兴趣的 AI 应用和热点信息&#xff0c;也会不定期分享自己的想法和开源实例&#xff0c;欢迎关注我哦&#xff01; &#x1f966; 微信公众号&#xff…

让Git走代理

有时候idea提交代码或者从github拉取代码&#xff0c;一直报错超时或者:Recv failure: Connection was reset,下面记录一下怎么让git走代理从而访问到github。 1.打开梯子 2.打开网络和Internet设置 3.设置代理 记住这个地址和端口 4.打开git bash终端 输入以下内容 git c…

CSS:导航栏三角箭头

用CSS实现导航流程图的样式。可根据自己的需求进行修改&#xff0c;代码精略的写了一下。 注&#xff1a;场景一和场景二在分辨率比较低的情况下会有一个1px的缝隙不太优雅&#xff0c;自行处理。有个方法是直接在每个外面包一个DIV&#xff0c;用动态样式设置底色。 场景一、…

第4章-计划 4.3 订计划、勤跟踪、要闭环

4.3 订计划、勤跟踪、要闭环 1.制订好的第一版计划先要基线化&#xff0c;确保有据可依2.计划要监督执行&#xff0c;发现延期时要“喊出来”3.计划要赶得上变化4.资源保障是计划能够执行的依赖 坚定执行制订好的计划&#xff0c;监督执行效果&#xff0c;计划产生偏差时及时制…

在 WPF 中,如何实现数据的双向绑定?

在 WPF 中&#xff0c;数据绑定是一个非常重要的特性&#xff0c;它允许 UI 与数据源之间自动同步。双向绑定是一种常见的绑定方式&#xff0c;当数据源更新时&#xff0c;UI 会自动更新&#xff1b;同样&#xff0c;当 UI 中的元素&#xff08;如文本框&#xff09;发生改变时…

Java面向对象编程进阶之包装类

Java面向对象编程进阶之包装类 一、为什么要使用包装类二、掌握基本数据类型与包装类之间的转换1、为什么需要转换&#xff1f;2、如何转换&#xff1f; 三、String与基本数据类型、包装类之间的转换1、案例2、特别注意 一、为什么要使用包装类 为了使得基本类型的数据变量具备…

基于Spring Boot与Redis的令牌主动失效机制实现

目录 前言1. 项目结构和依赖配置1.1 项目依赖配置1.2 Redis连接配置 2. 令牌主动失效机制的实现流程2.1 登录成功后将令牌存储到Redis中2.2 使用拦截器验证令牌2.3 用户修改密码后删除旧令牌 3. Redis的配置与测试4. 可能的扩展与优化结语 前言 在现代Web系统中&#xff0c;用…

yolov8-cls的onnx与tensorrt推理

本文不生产技术,只做技术的搬运工! 前言 最近需要使用yolov8-cls进行模型分类任务,但是使用ultralytics框架去部署非常不方便,因此打算进行onnx或者tensorrt去部署,查看了很多网上的帖子,并没有发现有完整复现yolov8-cls前处理(不需要后处理)的"轮子",通过自己debug…

Acrobat Pro DC 2023(pdf免费转化word)

所在位置 通过网盘分享的文件&#xff1a;Acrobat Pro DC 2023(64bit).tar 链接: https://pan.baidu.com/s/1_m8TT1rHTtp5YnU8F0QGXQ 提取码: 1234 --来自百度网盘超级会员v4的分享 安装流程 打开安装所在位置 进入安装程序 找到安装程序 进入后点击自定义安装&#xff0c;这里…

VMware和CentOS 7.6 Linux操作系统的安装使用

1. 安装VMware 安装VMware之前&#xff0c;有些电脑是需要去BIOS里修改设置开启cpu虚拟化设备支持才能安装。如果运气不好在安装过程中安装不了的话就自行百度吧。 打开 VMware 的官网: https://www.vmware.com/ 点击 product&#xff0c;往下滑找到 see desktop hypeerviso…