实时流式计算 kafkaStream

文章目录

  • 实时流式计算
  • Kafka Stream
  • Kafka Streams 的关键概念
  • KStream
  • Kafka Stream入门案例编写
  • SpringBoot 集成 Kafka Stream


实时流式计算

一般流式计算会与批量计算相比较

在这里插入图片描述

流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界

一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。
流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。

应用场景

  • 日志分析
    网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策
  • 大屏看板统计
    可以实时的查看网站注册数量,订单数量,购买数量,金额等。
  • 公交实时数据
    可以随时更新公交车方位,计算多久到达站牌等
  • 实时文章分值计算
    头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐。

技术方案选型

  • Hadoop
  • Apche Storm
  • Flink
  • Kafka Stream

可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包,部署和操作工具集成。

Kafka Stream

Kafka Stream:提供了对存储于 Kafka内 的数据进行流式处理分析的功能

Kafka Stream的特点如下:

  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖
  • 充分利用Kafka分区机制实现水平扩展和顺序性保证
  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
  • 支持正好一次处理语义
  • 提供记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

在这里插入图片描述

Kafka Streams 的关键概念

在这里插入图片描述

  • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。

  • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题

KStream

  • 数据结构类似于map,如下图,key-value 键值对

在这里插入图片描述

KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。

在这里插入图片描述

Kafka Stream入门案例编写

  1. 需求分析,求单词个数(word count)

在这里插入图片描述

  1. 创建原生的 kafka staream 入门案例

导入依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <exclusions>
        <exclusion>
            <artifactId>connect-json</artifactId>
            <groupId>org.apache.kafka</groupId>
        </exclusion>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
    </exclusions>
</dependency>

在这里插入图片描述

在这里插入图片描述

package com.heima.kafka.sample;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * 流式处理
 */
public class KafkaStreamQuickStart {

    public static void main(String[] args) {

        //kafka的配置信心
        Properties prop = new Properties();
        prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
        prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");

        //stream 构建器
        StreamsBuilder streamsBuilder = new StreamsBuilder();

        //流式计算
        streamProcessor(streamsBuilder);


        //创建kafkaStream对象
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop);
        //开启流式计算
        kafkaStreams.start();
    }

    /**
     * 流式计算
     * 消息的内容:hello kafka  hello itcast
     * @param streamsBuilder
     */
    private static void streamProcessor(StreamsBuilder streamsBuilder) {
        //创建kstream对象,同时指定从那个topic中接收消息
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
        /**
         * 处理消息的value
         */
        stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.split(" "));
            }
        })
                //按照value进行聚合处理
                .groupBy((key,value)->value)
                //时间窗口
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                //统计单词的个数
                .count()
                //转换为kStream
                .toStream()
                .map((key,value)->{
                    System.out.println("key:"+key+",vlaue:"+value);
                    return new KeyValue<>(key.key().toString(),value.toString());
                })
                //发送消息
                .to("itcast-topic-out");
    }
}
  1. 测试准备
  • 使用生产者在 topic 为:itcast_topic_input中发送多条消息
  • stream 接收 itcast_topic_input 的数据,进行聚合操作后,将处理结果发送到 itcast_topic_out
  • 使用消费者接收 topic 为:itcast_topic_out

结果:

  • 通过流式计算,会把生产者的多条消息汇总成一条发送到消费者中输出

SpringBoot 集成 Kafka Stream

  1. 配置

在这里插入图片描述

package com.heima.kafka.config;

import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;

import java.util.HashMap;
import java.util.Map;

/**
 * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
 */

@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
    private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
    private String hosts;
    private String group;
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
        props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
        props.put(StreamsConfig.RETRIES_CONFIG, 10);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return new KafkaStreamsConfiguration(props);
    }
}

application.yml

在这里插入图片描述

kafka:
  hosts: 192.168.200.130:9092
  group: ${spring.application.name}
  1. 在配置类中定义方法
  • 可注入StreamsBuilder
  • 返回值必须是KStream且放入spring容器中

在这里插入图片描述

package com.heima.kafka.stream;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;
import java.util.Arrays;

@Configuration
@Slf4j
public class KafkaStreamHelloListener {

    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder){
        //创建kstream对象,同时指定从那个topic中接收消息
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
        stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.split(" "));
            }
        })
                //根据value进行聚合分组
                .groupBy((key,value)->value)
                //聚合计算时间间隔
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                //求单词的个数
                .count()
                .toStream()
                //处理后的结果转换为string字符串
                .map((key,value)->{
                    System.out.println("key:"+key+",value:"+value);
                    return new KeyValue<>(key.key().toString(),value.toString());
                })
                //发送消息
                .to("itcast-topic-out");
        return stream;
    }
}
  1. 测试

启动 springboot 项目即可自动监听

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/214435.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

WEB服务器配置与HTTP分析

目录 实验目的&#xff1a; 实验要求&#xff1a; 实验原理&#xff1a; 实验步骤&#xff1a; 步骤1&#xff1a;创建拓扑 步骤2&#xff1a;为PC、Client和Server配置IPv4地址、子网掩码和域名服务器 步骤3&#xff1a;启动设备和服务器 步骤4&#xff1a;测试PC-1、C…

【Qt开发流程】之自定义语法高亮和使用HTML语法

描述 语法高亮&#xff08;Syntax Highlighting&#xff09;是一种在编辑器中突出显示代码语法元素的技术&#xff0c;使其更易于阅读和理解。 Qt提供了一个功能齐全的语法高亮框架&#xff0c;支持多种语言和格式&#xff0c;可以自定义颜色和样式。 对于使用Qt的开发人员来说…

HADOOP::Fsimage和Edits解析

NameNode被格式化之后&#xff0c;将在/opt/module hadoop-3.1.3/data/tmp/dfs/name/curent目录 中产生如下文件 fsimage_ 0000000000000000000 fsimage_ 0000000000000000000.md5 seen_txid VERSION (1) Fsimage文件: HDFS文件系统元数据的一个永久性的检查点&#xff0…

使用pytorch从零开始实现迷你GPT

生成式建模知识回顾: [1] 生成式建模概述 [2] Transformer I&#xff0c;Transformer II [3] 变分自编码器 [4] 生成对抗网络&#xff0c;高级生成对抗网络 I&#xff0c;高级生成对抗网络 II [5] 自回归模型 [6] 归一化流模型 [7] 基于能量的模型 [8] 扩散模型 I, 扩散模型 II…

机器学习决策树ID3算法

1、先去计算总的信息量 2、根据不同指标分别计算对应的信息增益 3、根据算出的信息增益来选择信息增益最大的作为根结点 4、天气中选择一个继续上述过程 5、决策树划分结束

solidity实现ERC20代币标准

文章目录 1、以太坊 - 维基百科2、IERC203、ERC204、Remix 编译部署 1、以太坊 - 维基百科 以太坊&#xff08;Ethereum&#xff09;是一个去中心化的开源的有智能合约功能的公共区块链平台。以太币&#xff08;ETH 或 Ξ&#xff09;是以太坊的原生加密货币。截至2021年12月&a…

克服.360勒索病毒:.360勒索病毒的解密和预防

导言: 在数字化的今天&#xff0c;数据安全问题变得愈发棘手。.360勒索病毒是当前网络空间的一场潜在灾难&#xff0c;对于这个威胁&#xff0c;了解应对之道和采取切实的预防措施至关重要。如果您正在经历勒索病毒的困境&#xff0c;欢迎联系我们的vx技术服务号&#xff08;s…

华为手环配置技巧

前言 华为手环作为生活健康辅助设备发挥不可忽视的作用&#xff0c;但每次更换手环后需要重新配置。华为手环不仅有健康监测、消息通知、天气推送、离线支付、公交卡、运动锻炼、等功能&#xff0c;还有倒计时、计时器、手电筒、闹钟、等小工具。下文介绍如何进行配置。 配置…

C/C++学生选课/排课系统[2023-12-3]

问题描述&#xff1a;根据我校自动化专业的部分必修及选修课信 息&#xff0c;设计一个学生选课/排课系统。 基本要求&#xff1a; 1、从文件读入课程信息&#xff1b; 2、从键盘输入拟添加的选修课信息&#xff1b; 3、删除已选的选修课(1门或多门) &#xff1b; 4、输出已…

【小沐学Python】网络爬虫之lxml

文章目录 1、简介2、安装3、基本功能3.1 lxml.etree3.2 解析HTML网页3.3 读取并解析HTML文件3.4 提取所有a标签内的文本信息3.5 树迭代3.6 序列化3.7 元素以字典的形式携带属性3.8 元素包含文本 4、代码测试4.1 lxml解析网页4.2 使用xpath获取所有的文本4.3 使用xpath获取 clas…

html动漫网页设计分享 紫罗兰永恒花园网页作业成品带视频,注册登录,表格,表单

html5静态网页设计要是用HTML DIVCSS JS等来完成页面的排版设计,一般的网页作业需要融入以下知识点&#xff1a;div布局、浮动、定位、高级css、表格、表单及验证、js轮播图、音频 视频 Flash的应用、ul li、下拉导航栏、鼠标划过效果等知识点&#xff0c;学生网页作业源码可以…

大数据-hive

简介 hive是基于Hadoop的一个数据仓库工具&#xff0c;用来进行数据提取、转化、加载&#xff0c;这是一种可以存储、查询和分析存储在Hadoop中的大规模数据的机制。hive数据仓库工具能将结构化的数据文件映射为一张数据库表&#xff0c;并提供SQL查询功能&#xff0c;能将SQL…

一站式自动化:Ansible Playbook的全面学习之旅

1 Playbook介绍 1.1 Playbook介绍 playbook 是由一个或多个play组成的列表 Playbook 文件使用YAML来写的 1.2 YAML 1.2.1 介绍 是一种表达资料序列的格式&#xff0c;类似XML Yet Another Markup Language 2001年首次发表 www.yaml.org 1.2.2 特点 可读性好 和脚本语言交…

探究两个互联网时代的差异,Web 2.0 与 Web 3.0 区别

Web 2.0 的特征 首先我们来了解一下 Web 2.0 的特征都有哪些。 用户生成内容&#xff1a;Web 2.0 时代以用户生成内容为特征&#xff0c;用户可以轻松地在网络上分享、创建和编辑信息。社交媒体平台、博客等网站的兴起使得用户成为信息的创造者&#xff0c;网络逐渐从被动浏览…

华为手环关闭智能适时测量

问题 使用华为手环并使用华为创新研究APP后&#xff0c;会自动打开智能适时测量开关&#xff0c;此开关开启后&#xff0c;手环会在睡眠时间自动测量血氧&#xff0c;增加手环功耗从而影响续航&#xff0c;用户可根据自身需求决定是否开启&#xff0c;下文介绍如何找到此开关。…

Vue+ElementUI+C#前后端分离:监控长耗时任务的实践

想象一下&#xff0c;我们正在构建一个Web应用&#xff0c;需要实现一个数据报告的导出功能。这听起来很简单&#xff0c;不是吗&#xff1f;但是&#xff0c;随着深入开发&#xff0c;我们意识到导出过程比预期的要复杂和耗时得多。由于报告的数据量巨大&#xff0c;后端需要花…

qt-C++笔记之识别点击鼠标右键、点击位置以及Qt坐标系详解

qt-C笔记之识别点击鼠标右键、点击位置以及Qt坐标系详解 code review! 文章目录 qt-C笔记之识别点击鼠标右键、点击位置以及Qt坐标系详解1.示例运行2.event->pos();详解3.event->pos()的坐标系原点4.Qt中的坐标系详解5.QMainWindow::mousePressEvent(event);详解 1.示例…

ssm医药进出口交易系统源码和论文

ssm医药进出口交易系统源码和论文726 首先,论文一开始便是清楚的论述了系统的研究内容。其次,剖析系统需求分析,弄明白“做什么”,分析包括业务分析和业务流程的分析以及用例分析,更进一步明确系统的需求。然后在明白了系统的需求基础上需要进一步地设计系统,主要包罗软件架构…

LangChain 19 Agents Reason+Action自定义agent处理OpenAI的计算缺陷

LangChain系列文章 LangChain 实现给动物取名字&#xff0c;LangChain 2模块化prompt template并用streamlit生成网站 实现给动物取名字LangChain 3使用Agent访问Wikipedia和llm-math计算狗的平均年龄LangChain 4用向量数据库Faiss存储&#xff0c;读取YouTube的视频文本搜索I…

USART的PAL库编程

USART驱动的工作原理 总结一下我们之前使用中断的方式来进行数据的发送和接收 如果收到数据数据在RDR寄存器中 RXNE标志位就从0到1触发中断 进入中断服务函数 把数据缓存在队列中 然后在到进程函数中断接收数据函数中进行出队处理 发送数据就是把中断关闭&#xff08;标志位TXE…