Flink之RedisSink

在Flink开发中经常会有将数据写入到redis的需求,但是Flink官方并没有对应的扩展包,这个时候需要我们自己编译对应的jar资源,这个时候就用到了bahir,barhirapahce的开源项目,是专门给sparkflink提供扩展包使用的,bahir官网,这篇文章就介绍下如何自己编译RedisSink扩展包.

  • 下载源码包
    通过下图进入到GitHub
    在这里插入图片描述
    选择clonedownload源码都可以,如下图
    在这里插入图片描述
  • 编译源码包
    下载好源码后,maven会自动下载对应的依赖项
    • 删除不需要的子项目
      因为我们这里需要编译redis对应的扩展包,所以其他的子项目都可以删除掉,下图中红色框标注的都可以删除
      在这里插入图片描述
    • 修改pom文件
      删除掉不需要的子项目后,在pom文件中也要删除对应的子项目配置
      <!-- 这里只保留这一个模块就可以了 -->
      <modules>
          <module>flink-connector-redis</module>
      </modules>
      
      修改完成模块配置后,还需要修改对应的flinkscala版本依赖,这个根据自己实际的开发环境进行修改
       <properties>
          <!-- 修改这里的版本就可以 -->
          <!-- Flink version -->
          <flink.version>1.15.3</flink.version>
          <scala.binary.version>2.12</scala.binary.version>
          <scala.version>2.12.11</scala.version>
      </properties>
      
      这些都完成后就可以通过maven下载对应的依赖了.
  • 编译安装
    依赖下载完成后pom文件中可能会有几处是报错的状态,如下图
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    以上几处错误无需理会,不影响扩展包的编译.
    接下来通过maveninstall将扩展包编译并安装到本地的maven资源库,如下图
    在这里插入图片描述
    编译完成后我们就可以在自己的flink项目中引入对应的扩展包了
        <!-- Redis connector -->
        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis</artifactId>
            <version>1.2-SNAPSHOT</version>
        </dependency>
    
    上面依赖中groupId是固定的,artifactId要根据flink-connector-redis项目中的pom文件中artifactId来拿,同样version也是一样,到这里扩展包的问题就已经解决了.
  • 代码
    其实在GitHub上已经给了代码示例单机(java,scala)、集群(java,scala)的代码模板都是有的,下面就以单机redis作为示例.
    这里我们要创建一个类实现RedisMapper
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
    
    /**
     * @Author: J
     * @Version: 1.0
     * @CreateTime: 2023/8/4
     * @Description: 测试
     **/
    public class RedisExampleMapper implements RedisMapper<Tuple2<String, String>> {
        @Override
        // 这个方法是选择使用哪种命令插入数据到Redis
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME");
        }
    
        @Override
        // 这个方法是选择哪个作为Key
        public String getKeyFromData(Tuple2<String, String> data) {
            return data.f0;
        }
    
        @Override
        // 这个方法是选择哪个作为Value
        public String getValueFromData(Tuple2<String, String> data) {
            return data.f1;
        }
    }
    
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.redis.RedisSink;
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
    
    /**
     * @Author: J
     * @Version: 1.0
     * @CreateTime: 2023/8/4
     * @Description: 测试
     **/
    public class FlinkRedisSink {
        public static void main(String[] args) throws Exception {
            // 构建流环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 这里使用的是自定义数据源为了方便测试
            DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());
            // 将数据转换成Tuple的形式
            SingleOutputStreamOperator<Tuple2<String, String>> tuple2Stream = customizeSource
                                 .map((MapFunction<CustomizeBean, Tuple2<String, String>>) value -> Tuple2.of(value.getAge() + "-" + value.getHobbit(), value.toString()))
                                 .returns(TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));// Tuple2是flink中提供的类型java无法自动推断,所以加上这段代码
            // 配置Redis
            FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
                    .setHost("127.0.0.1") // redis服务器地址
                    .setPassword("password") // redis密码
                    .build();
            // 添加Sink
            tuple2Stream.addSink(new RedisSink<Tuple2<String, String>>(conf, new RedisExampleMapper());
            env.execute("Redis Sink");
        }
    }
    
    到这里代码就结束了,具体应用根据实际业务需求进行更改.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/60464.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Prometheus服务器、Prometheus被监控端、Grafana、Prometheus服务器、Prometheus被监控端、Grafana

day03 day03Prometheus概述部署Prometheus服务器环境说明&#xff1a;配置时间安装Prometheus服务器添加被监控端部署通用的监控exporterGrafana概述部署Grafana展示node1的监控信息监控MySQL数据库配置MySQL配置mysql exporter配置mysql exporter配置prometheus监控mysql自动…

spring security + oauth2 使用RedisTokenStore 以json格式存储

1.项目架构 2.自己对 TokenStore 的 redis实现 package com.enterprise.auth.config;import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis…

Spring Boot、Spring Cloud、Spring Alibaba 版本对照关系及稳定兼容版本

Spring Boot、Spring Cloud、Spring Alibaba 版本对照关系及稳定兼容版本 引言 在 Java 生态系统中&#xff0c;Spring Boot、Spring Cloud 和 Spring Alibaba 是非常流行的框架&#xff0c;它们提供了丰富的功能和优雅的解决方案。然而&#xff0c;随着不断的发展和更新&…

如何建立含有逻辑删除字段的唯一索引

业务场景 在实际工作当中&#xff0c;遇到一个场景&#xff0c;就是在用户注册时&#xff0c;名字要全局唯一&#xff0c;当然&#xff0c;我们是可以对用户进行删除的&#xff0c;你会怎么去做&#xff1f; 分析 一般来说&#xff0c;我们可以在用户注册请求时&#xff0c…

基于时空RBF神经网络的混沌时间序列预测(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

【阵列信号处理】空间匹配滤波器、锥形/非锥形最佳波束成形器、样本矩阵反演 (SMI) 研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

chrome扩展在popup、background、content之间通信解决传输文件问题

文章目录 背景介绍案例介绍代码示例popup页面&#xff0c;上传文件页面popup页面&#xff0c;js上传代码&#xff0c;file文件转base64background监听消息&#xff0c;base64转file文件&#xff0c;axios上传 附-转base64后直接下载 背景介绍 示例扩展API版本MV2。 以弹…

3d 地球与卫星绕地飞行

1 创建场景 2 创建相机 3 创建地球模型 4 创建卫星中心 5 创建卫星圆环及卫星 6 创建控制器 7 创建渲染器 <template><div class"home3dMap" id"home3dMap"></div> </template><script> import * as THREE from three impo…

GD32F103*固件库移植μCOS-Ⅲ详细教程与解析(最终版本已上传,可下载)

GD32F103*固件库移植μCOS-Ⅲ详细教程与解析&#xff08;最终版本已上传&#xff0c;可下载&#xff09; GD32F103*移植μCOS-Ⅲ详细教程与解析&#xff0c;欢迎指正 文章目录 GD32F103*固件库移植μCOS-Ⅲ详细教程与解析&#xff08;最终版本已上传&#xff0c;可下载&#x…

appium自动爬取数据

爬取类容&#xff1a;推荐知识点中所有的题目 爬取方式&#xff1a;appium模拟操作获取前端数据 入门级简单实现&#xff0c;针对题目和答案是文字内容的没有提取出来 适用场景;数据不多&#xff0c;参数加密&#xff0c;反爬严格等场景 from appium import webdriver impor…

神经概率语言模型

本文主要参考《A Neural Probabilistic Language Model》这是一篇很重要的语言模型论文,发表于2003年。主要贡献如下: 提出了一种基于神经网络的语言模型&#xff0c;是较早将神经网络应用于语言模型领域的工作之一&#xff0c;具有里程碑意义。采用神经网络模型预测下一个单词…

opencv37-形态学操作-开运算(先腐蚀后膨胀)cv2.morphologyEx()-参数 op 设置为“cv2.MORPH_OPEN”

腐蚀操作和膨胀操作是形态学运算的基础&#xff0c;将腐蚀和膨胀操作进行组合&#xff0c;就可以实现开运算、闭运算&#xff08;关运算&#xff09;、形态学梯度&#xff08;MorphologicalGradient&#xff09;运算、礼帽运算&#xff08;顶帽运算&#xff09;、黑帽运算、击中…

uniapp:图片验证码检验问题处理

图形验证码功能实现 uniapp&#xff1a;解决图形验证码问题及利用arraybuffer二进制转base64格式图片&#xff08;后端传的图片数据形式&#xff1a;x00\x10JFIF\x00\x01\x02\x00…&#xff09;_❆VE❆的博客-CSDN博客 UI稿&#xff1a; 需求&#xff1a;向后端请求验证码图片&…

Stable Diffusion AI绘画学习指南【本地环境搭建win+mac】

一、硬件配配置要求 系统&#xff1a;windows 10 / Mac os 硬盘&#xff1a;C 盘预留 15GB 以上&#xff0c;其他盘 50GB 以上,Stable Ddiffusion的很多大模型都是以 GB 起步。 显卡&#xff1a;4GB 以上&#xff0c;建议 8GB, 效率高&#xff0c;能玩大尺寸的图 CPU&…

[MAUI]模仿微信“按住-说话”的交互实现

今天使用这个控件&#xff0c;做一个模仿微信“按住-说话”的小功能&#xff0c;最终效果如下&#xff1a; 使用.NET MAUI实现跨平台支持&#xff0c;本项目可运行于Android、iOS平台。 创建页面布局 新建.NET MAUI项目&#xff0c;命名HoldAndSpeak MainPage.xaml中创建一个…

Flink读取mysql数据库(java)

代码如下: package com.weilanaoli.ruge.vlink.flink;import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org…

【数据结构|二叉树遍历】递归与非递归实现前序遍历、中序遍历、后序遍历

递归与非递归实现二叉树的前序遍历、中序遍历、后序遍历。 二叉树图 定义 前序遍历&#xff08;Preorder Traversal&#xff09;&#xff1a; 前序遍历的顺序是先访问根节点&#xff0c;然后按照先左后右的顺序访问子节点。对于上面的二叉树&#xff0c;前序遍历的结果是&…

【React】搭建React项目

最近自己在尝试搭建react项目&#xff0c;其实react项目搭建没有想象中的那么复杂&#xff0c;我们只需要使用一个命令把React架子搭建好&#xff0c;其他的依赖可以根据具体的需求去安装&#xff0c;比如AntDesignMobile的UI框架&#xff0c;执行npm install antd-mobile --sa…

【计算机网络】NAT及Bridge介绍

OSI七层模型 七层模型介绍及举例 为通过网络将人类可读信息通过网络从一台设备传输到另一台设备&#xff0c;必须在发送设备沿 OSI 模型的七层结构向下传输数据&#xff0c;然后在接收端沿七层结构向上传输数据。 数据在 OSI 模型中如何流动 库珀先生想给帕尔梅女士发一封电…

android kernel移植5-RK3568

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言1.添加开发板默认配置文件前言 前面我们已经学会了移植uboot,其实就是把瑞芯微的关于uboot的一些文件的名字和编译指定的文件改为自己定义的问价和名字,那么接下来的Android kernel其实也是…