kafka消息队列最常用的两种模式,以及应用场景

目录

一、发布-订阅模式

二、点对点模式

三、应用场景


 

一、发布-订阅模式

发布-订阅模式是最常见的消息传递模式,其中消息发布者将消息发送到一个或多个主题(Topic),而订阅者可以选择订阅一个或多个主题来接收消息。每个订阅者都可以独立地消费消息,而发布者和订阅者之间没有直接的联系。

在Kafka中,使用KafkaProducer类进行消息发布,KafkaConsumer类进行消息订阅。以下是一个简单的Java代码示例:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Properties;

public class PubSubExample {
    private static final String TOPIC = "my_topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        // Kafka Producer
        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

        // Publish messages
        for (int i = 0; i < 10; i++) {
            String message = "Message " + i;
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.err.println("Error publishing message: " + exception.getMessage());
                    } else {
                        System.out.println("Message published successfully: " + metadata.offset());
                    }
                }
            });
        }

        producer.close();

        // Kafka Consumer
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my_consumer_group");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
        consumer.subscribe(Collections.singletonList(TOPIC));

        // Consume messages
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: " + record.value());
                // Process the message
            }
        }
    }
}

 

二、点对点模式

点对点模式中,消息发送者将消息发送到一个指定的队列(Queue),而消息接收者从相同的队列中接收消息。每个消息只能被一个接收者消费。

在Kafka中,点对点模式可以通过创建单个消费者组来实现。以下是一个简单的Java代码示例:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Properties;

public class PointToPointExample {
    private static final String QUEUE = "my_queue";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
        // Kafka Producer
        Properties producerProps = new Properties();
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);

        // Publish messages
        for (int i = 0; i < 10; i++) {
            String message = "Message " + i;
            ProducerRecord<String, String> record = new ProducerRecord<>(QUEUE, message);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.err.println("Error publishing message: " + exception.getMessage());
                    } else {
                        System.out.println("Message published successfully: " + metadata.offset());
                    }
                }
            });
        }

        producer.close();

        // Kafka Consumer
        Properties consumerProps = new Properties();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my_consumer_group");
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
        consumer.subscribe(Collections.singletonList(QUEUE));

        // Consume messages
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("Received message: " + record.value());
                // Process the message
                consumer.commitAsync();
            }
        }
    }
}

以上代码示例演示了如何使用Kafka的Java客户端库进行发布和订阅消息以及点对点消息传递。请注意,代码中的BOOTSTRAP_SERVERS需要根据你的实际环境进行配置。

 

三、应用场景

Kafka消息队列具有高吞吐量、低延迟、可扩展性等特点,因此广泛应用于以下场景:

  1. 日志收集和数据管道:Kafka可以用作集中式日志收集系统,可以将不同服务、应用程序、服务器生成的日志集中到一个中心化的消息队列中,再通过消费者进行处理、分析和存储。同时,Kafka还可以作为数据管道,将不同数据源的数据通过消息队列进行传输和处理。

  2. 实时流处理:Kafka与流处理框架(如Apache Flink、Apache Spark)结合使用,可以实现实时的数据流处理。Kafka可以作为输入源和输出源,将数据流传输给流处理框架进行实时分析、计算和处理。

  3. 微服务架构:Kafka可以用作微服务之间的异步通信机制,不同的微服务各自独立地生产和消费消息,实现解耦和扩展性。同时,Kafka还可以用于实现事件驱动架构,不同的微服务通过订阅事件的方式进行通信和协作。

  4. 网络爬虫和数据采集:Kafka可以用于构建高可靠的网络爬虫系统和数据采集系统。爬虫可以将抓取的数据写入Kafka队列,然后其他系统可以消费这些数据进行进一步的处理和分析。

  5. 消息系统和通信中间件:Kafka提供了可靠的消息传递机制,可以作为消息系统和通信中间件,用于构建分布式系统、实现异步通信和跨系统的数据传输。

总之,Kafka消息队列的应用场景非常广泛,适用于大数据处理、实时数据流处理、异步通信等各种场景。它具有高性能、可靠性和可扩展性的特点,可以帮助解决数据流处理和消息传递的各种问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/40957.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

React(2)

题外话&#xff1a;vscode有个插件可以很方便的快速写代码 输入rcc回车 1.组件嵌套 import React, { Component } from reactclass Navbar extends Component{render(){return <div>Navbar</div>} }const Swiper()>{return <div>Swiper</div> }cons…

适合小公司的自动化部署脚本

背景&#xff08;偷懒&#xff09; 在小小的公司里面&#xff0c;挖呀挖呀挖。快挖不动了&#xff0c;一件事重复个5次&#xff0c;还在人肉手工&#xff0c;身体和心理就开始不舒服了&#xff0c;并且违背了个人的座右铭&#xff1a;“偷懒”是人类进步的第一推动力。 每次想…

资深测试总结,自动化测试-JSON+YAML+CSV+Excel数据驱动(详细)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 数据驱动 在自动…

pytorch深度学习逻辑回归 logistic regression

# logistic regression 二分类 # 导入pytorch 和 torchvision import numpy as np import torch import torchvision from torch.autograd import Variable import torch.nn as nn import torch.nn.functional as F import torch.optim as optim import matplotlib.pyplot as …

android editText获取不到数据

问题分析&#xff1a;在onActivityCreated一开始就创建了findViewById&#xff0c;这时获取的是默认值&#xff0c;需要在点击按钮时重新加载才能获取到输入数据。 需要在点击按钮时重新加载数据&#xff1a;

ORACLE TO POSTGRESQL 来自2天上海的印象

开头还是介绍一下群&#xff0c;如果感兴趣polardb ,mongodb ,mysql ,postgresql ,redis 等有问题&#xff0c;有需求都可以加群群内有各大数据库行业大咖&#xff0c;CTO&#xff0c;可以解决你的问题。加群请联系 liuaustin3 &#xff0c;在新加的朋友会分到2群&#xff08;共…

.NET网络编程——TCP通信

一、网络编程的基本概念 : 1. 网络 就是将不同区域的电脑连接到一起&#xff0c;组成局域网、城域网或广域网。把分部在不同地理区域的计算机于专门的外部设备用通信线路 互联成一个规模大、功能强的网络系统&#xff0c;从而使众多的计算机可以方便地互相传递信息&#xff0c…

netty组件详解-上

netty服务端示例: private void doStart() throws InterruptedException {System.out.println("netty服务已启动");// 线程组EventLoopGroup group new NioEventLoopGroup();try {// 创建服务器端引导类ServerBootstrap server new ServerBootstrap();// 初始化服…

了解交换机接口的链路类型(access、trunk、hybrid)

上一个章节中讲到了vlan的作用及使用&#xff0c;这篇了解一下交换机接口的链路类型和什么情况下使用 vlan在数据包中是如何体现的&#xff0c;在上一篇的时候提到测试了一下&#xff0c;从PC1去访问PC4的时候&#xff0c;只从E0/0/2发送给了E0/0/3这是&#xff0c;因为两个接…

【SpringⅡ】简单高效地存储读取对象

目录 &#x1f9e5;1 配置扫描路径 &#x1f9e4;2 类注解实现 Bean 对象的存储 &#x1fa71;2.1 五大类注解的使用 &#x1f381;2.2 五大类注解之间的关系 &#x1f38f;2.3 Java 项目的标准分层 &#x1f383;3 方法注解实现 Bean 对象的存储 &#x1f388;3.1 Bean…

【论文阅读】一些多轮对话文章的体会 ACL 2023

前言 本文是对昨天看到的ACL 2023三篇多轮对话文章的分享这三个工作都是根据一些额外属性控制输出的工作&#xff0c;且评估的方面比较相似&#xff0c;可以借鉴 方法 这几篇文章都不是做general任务的&#xff0c;倾向于通过一些额外信息&#xff0c;来做specific任务 【1】…

【ceph】存储池pg个数如何设置

存储池pg个数如何设置 参考官方文档说明&#xff1a;https://old.ceph.com/pgcalc/参数说明TargePGs per OSD&#xff1a;每个OSD的pg数OSD#存储池包含osd个数%Data存储池写入数据占总OSD容量百分比Size存储池冗余数

vue2watch监听遇到的问题

1 vue 父组件里引入子组件 显示与隐藏是v-if控制时 父传入子的参数通过watch 监听请求接口时 watch 时而监听不到 请求接口的参数就不对 如图 父组件这么引入子组件v-show 和v-if 是有区别的 2 子组件通过watch 监听后 清空页面要展示的列表数据 重新从第一页加载数据&#x…

微服务sleuth+zipkin——链路追踪

一、链路追踪&#x1f349; 1.什么是链路追踪&#xff1f;&#x1f95d; 在大型系统的微服务化构建中&#xff0c;一个系统被拆分成了许多模块。这些模块负责不同的功能&#xff0c;组合成系统&#xff0c;最终可以提供丰富的功能。在这种架构中&#xff0c;一次请求往往需要…

简单了解UML类图

前言 大话设计中&#xff0c;多次使用UML类图来表示&#xff0c;并也给了基本的介绍&#xff0c;这里从书中选出UML图和代码做成笔记&#xff0c;以方便查找。 1、类 注意前面的符号&#xff1a; &#xff1a;public -&#xff1a;private #&#xff1a;protected 抽象类&…

在阿里云linux上安装MySql数据库

我们先远程连接服务器 然后输入 sudo yum update重新运行一下 然后 sudo yum install mysql-server安装 mysql 服务 其中有两次 y n 选择 都选y就好了 然后 运行 sudo service mysqld start启动MySql 然后 我们查看一下MySql sudo service mysqld status

Debian 12上如何关闭nobody共享文件夹,一个能让INSCODE AI 创作助手不知所措的小问题

这个问题之前在Debian 10和11上都没有遇到过&#xff0c;换上Debian 12后Samba的设置就出现了状况&#xff0c;装上Samba后什么都没有设置就在局域网可以看到&#xff1a; 根据之前的经验在/etc/samba/smb.conf里查了很久也没有看出所以然来&#xff0c;后来又问了INSCODE AI…

[洛谷]P8662 [蓝桥杯 2018 省 AB] 全球变暖(dfs)

读题不规范&#xff0c;做题两年半&#xff01; 注意&#xff1a;被海水淹没后的陆地应用另一个字符表示&#xff0c;而不是把它变为海洋&#xff0c;这个点可以便利&#xff0c;但不能被当作起点&#xff0c;不然就只有 36 分。 ACocde: #include<bits/stdc.h> using…

静态数码管显示

学习芯片&#xff1a; EP4CE6F17C8 本次学习使用的为共阴极数码管&#xff0c;即用低电平点亮数码管&#xff0c;同样可知&#xff0c;共阳极数码管的阳极连在一起&#xff0c;即用高电平点亮数码管。 八段数码管示意图&#xff1a; a,b,c,d,e,f,g,dg表示八段数码管时&#…

微服务sleuth+zipkin---链路追踪+nacos配置中心

目录 1.分布式链路追踪 1.1.链路追踪Sleuth介绍 1.2.如何完成sleuth 1.3.zipkin服务器 2.配置中心 2.1.常见配置中心组件 2.2.微服务集群共享一个配置文件 2.2.1实时刷新--配置中心数据 2.2.2.手动写一个实时刷新的配置类 ----刷新配置文件 2.3.多个微服务公用一个配…