大数据-玩转数据-Flink RedisSink

一、添加Redis Connector依赖

具体版本根据实际情况确定

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.1.5</version>
</dependency>

二、启动redis

参见大数据-玩转数据-Redis 安装与使用

三、编写代码

package com.lyh.flink06;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

public class SinkRedis {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);
        KeyedStream<Integer, Integer> keyedStream = dataStreamSource.keyBy(new KeySelector<Integer, Integer>() {
            @Override
            public Integer getKey(Integer key) throws Exception {
                return key.intValue();
            }
        });
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
                .setHost("hadoop100")
                .setPort(6379)
                .setMaxTotal(100)
                .setMaxIdle(10)
                .setMinIdle(2)
                .setTimeout(10*1000)
                .setDatabase(0)
                .setPassword("redis")
                .build();


        keyedStream.addSink(new RedisSink<>(conf, new RedisMapper<Integer>() {
            @Override
            public RedisCommandDescription getCommandDescription() {
                return new RedisCommandDescription(RedisCommand.SET);
            }

            @Override
            public String getKeyFromData(Integer integer) {
                return integer.toString();
            }

            @Override
            public String getValueFromData(Integer integer) {
                return integer.toString();
            }
        }));
        env.execute();
    }
}

可以根据要写入的redis的不同数据类型进行调整

四、查询结果

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/78217.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

两个案例熟悉String的基本操作

1、第一个案例 Java语言规范要求完全相同的字符串字面量&#xff0c;应该包含同样的Unicode字符序列&#xff08;包含同一份码点序列的常量&#xff09;&#xff0c;并且必须是指向同一个String类实例。 package string; public class StringTest4 {public static void main(St…

AutoSAR系列讲解(深入篇)13.3-Mcal Dio配置

目录 一、Dio port配置 二、Dio pin配置 一、Dio port配置 同之前的Port一样,双击进入Dio配置界面后会看到几乎差不多的配置界面。General和Port类似,我们不再赘述,主要讲解Dio的配置 1. 其实Dio并没有什么实质的作用,主要起到了一个重命名的功能。双击DioConfig_0进入下…

文本图片怎么转Excel?分享一些好用的方法

在处理数据时&#xff0c;Excel 是一个非常强大的工具&#xff0c;但有时候需要将文本和图片转换为 Excel 格式&#xff0c;这可能会让人感到困惑。在本文中&#xff0c;我们将介绍一些好用的方法&#xff0c;以便您能够轻松地将文本和图片转换成 Excel 格式。 将文本图片为Exc…

高性能本地缓存Ristretto(四)—— NumCounters 与 MaxCost参数的设置

ristretto 参数 我在使用ristretto时&#xff0c;对于参数的设置有些疑问。主要是 NumCounters &#xff0c;MaxCost 分别表示什么含义&#xff0c;以及如何确定其数值的问题。 在此记录并分享一下&#xff0c;欢迎各位批评指正&#xff0c;谢谢 官方的指导 先看一下官方的…

前端开发,怎么解决浏览器兼容性问题? - 易智编译EaseEditing

解决浏览器兼容性问题是前端开发中常见的挑战之一。不同的浏览器可能对网页元素的渲染和功能支持有所不同&#xff0c;因此需要采取一些策略来确保您的网页在不同浏览器上都能正常运行和呈现。以下是一些解决浏览器兼容性问题的方法和策略&#xff1a; 使用CSS Reset&#xff…

MMdetection在VisDrone2019上训练FCOS和CenterNet

配置环境 Python 3.5>PyTorch 1.1>CUDA 9.0NCCL 2>GCC 4.9mmcv‘’ 把mmdetection的代码下载下来 git clone https://github.com/open-mmlab/mmdetection.git进入这个mmdetection文件,准备编译mmdetection的文件 cd mmdetection 装一下下面这些包&#xff0c; #…

idea cannot download sources 解决方法

问题 点击class文件右上角下载源码失败 解决方案 找到idea terminal 控制台cd 至maven工程执行 mvn dependency:resolve -Dclassifiersources

ElasticSearch相关概念

1、概述 先说Elasticsearch的文件存储&#xff0c;Elasticsearch是面向文档型数据库&#xff0c;一条数据在这里就是一个文档&#xff0c;用JSON作为文档序列化的格式&#xff0c;比如下面这条用户数据&#xff1a; {"name" : "John","sex"…

web连接桌面打开gptmap

一&#xff1a;环境配置 需要的材料&#xff1a; python-3.10.4 我使用的是这个版本的&#xff0c;3.8.10 该版本和以下版本组件组合&#xff0c;验证过能正常运行&#xff08;python 3.6.8测试异常&#xff09; websockify 该项目有python版本和node js版本 noVNC 形式的app…

Android-网络访问技术Retrofit浅析

Retrofit是一种基于注解的网络请求库&#xff0c;专门用于在Android应用中进行网络访问。它使用简洁的方式定义了网络请求的接口&#xff0c;并自动将请求结果解析为Java对象。Retrofit的核心原理是利用了Java的动态代理技术&#xff0c;将网络请求接口的注解信息转化为具体的网…

JDK 20下载与安装 (Windows 11系统)

JDK(Java Development Kit)是 Java 语言的软件开发工具包&#xff0c;主要用于移动设备、嵌入式设备上的java应用程序。JDK是整个java开发的核心&#xff0c;它包含了JAVA的运行环境&#xff08;JVMJava系统类库&#xff09;和JAVA工具。 JDK 20 下载 JDK 官网路径&#xff1…

中期国际:MT4数据挖掘与分析方法:以数据为导向,制定有效的交易策略

在金融市场中&#xff0c;制定有效的交易策略是成功交易的关键。而要制定一份可靠的交易策略&#xff0c;数据挖掘与分析方法是不可或缺的工具。本文将介绍如何以数据为导向&#xff0c;利用MT4进行数据挖掘与分析&#xff0c;从而制定有效的交易策略。 首先&#xff0c;我们需…

一次网络不通“争吵“引发的思考

作者&#xff1a; 郑明泉、余凯 为啥争吵&#xff0c;吵什么&#xff1f; “你到底在说什么啊&#xff0c;我K8s的ecs节点要访问clb的地址不通和本地网卡有什么关系…” 气愤语气都从电话那头传了过来&#xff0c;这时电话两端都沉默了。过了好一会传来地铁小姐姐甜美的播报声…

消息队列

消息队列是和管道并列的概念&#xff0c;也是进程间通信IPC中的一种。 本文参考&#xff1a;消息队列&#xff08;定义、结构、如何创建、消息队列的发送与接收、发送与接收实例&#xff09;_咋么又饿了的博客-CSDN博客 基本概念 消息队列是一种先进先出的队列型数据结构&…

Stable Diffusion核心算法DDPM解析

DDPM&#xff1a;Denoising Diffusion Probabilistic Model&#xff0c;去噪扩散概率模型 本文参考&#xff1a;一个视频看懂扩散模型DDPM原理推导|AI绘画底层模型_哔哩哔哩_bilibili 1、大概原理 从右往左为正向加噪过程&#xff0c;从左往右为逆向降噪过程。 在正向过程中不…

专访 BlockPI:共建账户抽象未来的新一代 RPC 基础设施

在传统 RPC 服务板块上&#xff0c;开发者一直饱受故障风险、运行环境混乱等难题的折磨。实现 RPC 服务的去中心化&#xff0c;且保持成本优势和可扩展性&#xff0c;始终是区块链基础设施建设的重要命题之一。从 2018 年观察中心化 RPC 供应商服务现状开始&#xff0c;BlockPI…

Kubernetes 企业级高可用部署

1、Kubernetes高可用项目介绍 单master节点的可靠性不高&#xff0c;并不适合实际的生产环境。Kubernetes 高可用集群是保证 Master 节点中 API Server 服务的高可用。API Server 提供了 Kubernetes 各类资源对象增删改查的唯一访问入口&#xff0c;是整个 Kubernetes 系统的数…

使用断言抛异常操作

使用断言抛异常操作 常用方法 public static void isNull(Nullable Object object, String message) {if (object ! null) {throw new IllegalArgumentException(message);}}public static void notNull(Nullable Object object, String message) {if (object null) {throw n…

问道管理:股票打板风险大吗?怎么降低打板风险?

在股票市场上&#xff0c;一些出资者喜爱低吸&#xff0c;一些喜爱打板&#xff0c;那么&#xff0c;股票打板危险大吗&#xff1f;怎么下降打板危险&#xff1f;下面问道管理为大家准备了相关内容&#xff0c;以供参阅。 股票打板通常是指在个股涨停时买入&#xff0c;这种买入…

您可以购买 Banana Pi BPI-CM2 而不是 Raspberry Pi CM4。它提供什么?

最近&#xff0c;Banana Pi&#xff08;SINOVOIP&#xff09;推出了Banana Pi BPI-CM2系统级模块&#xff08;SoM&#xff09;。BPI-CM2 是类似于Raspberry Pi CM4 的计算模块&#xff0c;提供一系列令人印象深刻的功能。Banana BPI-CM2 SoM 采用Rockchip RK3568四核 Cortex-A5…