【.NET】Kafka消息队列介绍,使用Confluent.Kafka集成Kafka消息队列

一、Kafka介绍

kafka是一种高吞吐量、分布式、可扩展的消息中间件系统,最初由LinkedIn公司开发。随着不断的发展,在最新的版本中它定义为分布式的流处理平台,现在在大数据应用中也是十分广泛。

它可以处理大量的实时数据流,被广泛应用于日志收集、事件处理、流处理、消息队列等场景。

Kafka的架构包含producer(生产者)、consumer(消费者)、broker(代理服务器)等组件。生产者可以将消息发送到Kafka集群,消费者可以从Kafka集群订阅消息并进行处理,而broker则是消息的中转服务器,负责存储和转发消息。

Kafka的特点包括:

  • 高吞吐量:Kafka可以处理海量的数据流,支持每秒百万级别的消息处理。
  • 可扩展性:Kafka的集群可以根据需要进行水平扩展,从而提高系统的性能和容量。
  • 可靠性:Kafka支持多副本机制,可以保证数据的可靠性和高可用性。
  • 灵活性:Kafka支持多种消息格式和协议,可以与各种系统和工具进行集成。
  • Kafka是一个开源的项目,已经成为了Apache软件基金会的顶级项目.
Kafka & 核心概念

接着,我们看下它的核心概念,这些概念都很重要,在后边的学习中都会遇到,概念一定要搞明白,对于理解Kafka的工作原理和使用方法非常重要。不然学习起来比较懵, 下面一起看一下核心概念:

Topic
Topic是消息的逻辑容器,用于对消息进行分类和存储。在Kafka中,消息会被发布到指定的topic中,并且可以被一个或多个消费者订阅。Topic是Kafka的核心概念之一,是实现消息传递的基础。

Producer
Producer是消息的生产者,用于向指定的topic中发送消息。Producer负责将消息发送到Kafka集群中的broker节点,并且可以在发送消息时指定消息的key,以便Kafka将消息分配到指定的partition中。

Consumer
Consumer是消息的消费者,用于从指定的topic中接收消息。Consumer负责从Kafka集群中的broker节点获取消息,并且可以指定从哪个partition中获取消息。消费者可以以不同的方式进行消息消费,例如批量消费、轮询消费等。

Broker
Broker是Kafka集群中的一个节点,用于存储和管理消息。Broker是Kafka的核心组件之一,负责接收和处理生产者发送的消息,并将其存储到磁盘中,同时还负责将消息转发给消费者。

Partition
Partition是Kafka中实现数据分片的机制,一个topic可以被分成多个partition,每个partition都是一个有序的消息队列。消息在被发送到一个topic时,会被根据指定的key进行hash计算,然后被分配到对应的partition中。

Offset
Offset是Kafka中的一个重要概念,用于标识每个消息在一个partition中的位置。每个partition都有一个唯一的offset值,消费者可以根据offset来获取指定位置的消息。Kafka还提供了一种特殊的topic,称为__consumer_offsets,用于存储消费者消费的位置信息。

参考:https://zhuanlan.zhihu.com/p/612327585

二、C#引用Confluent.Kafka.dll实现kafka消息队列的实际开发例子

在这里插入图片描述

1、配置文件

  "KafkaConfig": {
    "BootstrapServers": "", 
    "SaslUsername": "",
    "SaslPassword": ""
  },

2、配置类: BusinessOptionsSetting.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace IntegratedPlatform.Domain.Dtos.OptionSetting
{
    /// <summary>
    /// 业务配置
    /// </summary>
    public class BusinessOptionsSetting
    {
        /// <summary>
        /// kafka 配置
        /// </summary>
        public KafkaConfig KafkaConfig { get; set; }
    }

    /// <summary>
    /// kafka 配置
    /// </summary>
    public class KafkaConfig 
    { 
        /// <summary>
        /// 服务端配置
        /// </summary>
        public string BootstrapServers { get; set; }

        /// <summary>
        /// Sasl用户名
        /// </summary>
        public string SaslUsername { get; set; }

        /// <summary>
        /// Sasl密码
        /// </summary>
        public string SaslPassword { get; set; }
    }
}

3、kafka服务类: KafkaServices.cs

using Confluent.Kafka;
using IntegratedPlatform.Domain.Dtos.OptionSetting;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using NPOI.XWPF.UserModel;
using Serilog;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography.Xml;
using System.Text;
using System.Threading.Tasks;
using static Confluent.Kafka.ConfigPropertyNames;

namespace IntegratedPlatform.Infrastructure.Kafka
{
    /// <summary>
    /// kafka服务类
    /// </summary>
    public class KafkaServices
    {
        private BusinessOptionsSetting options;
        public KafkaServices(IOptions<BusinessOptionsSetting> _options) 
        {
            options = _options.Value;
        }

        /// <summary>
	    /// 发送消息至指定主题
	    /// </summary>
	    /// <param name="topicName">主题名称</param>
	    /// <param name="message">消息内容</param>
	    /// <returns>异步任务</returns>
        public async Task PublishMessageAsync(string topicName, string message)
        {
            var config = new ProducerConfig
	        {
	            BootstrapServers = options.BootstrapServers,
	            EnableIdempotence = true, // 启用幂等性以防止重复发送
	            Acks = Acks.All, // 确保所有副本都收到消息
	            SecurityProtocol = SecurityProtocol.SaslPlaintext,
	            SaslMechanism = SaslMechanism.ScramSha256,
	            SaslUsername = options.SaslUsername,
	            SaslPassword = options.SaslPassword,
	            BatchNumMessages = 1, // 每个批次发送一条消息
	            AllowAutoCreateTopics = true, // 允许自动创建主题
	            MessageSendMaxRetries = 3, // 最大重试次数
	        };

            using (var producer = new ProducerBuilder<string, string>(config)
                 //.SetValueSerializer(new CustomStringSerializer<string>())
                 .Build())
            {

                try
                {
                    var deliveryReport = await producer.ProduceAsync(topicName, new Message<string, string> { Key = "1", Value = message });
                  //  Console.WriteLine($"delivered to: {deliveryReport.TopicPartitionOffset}");
                }
                catch (ProduceException<string, string> e)
                {
                    Log.Error($"failed to deliver message: {e.Message} [{e.Error.Code}]");
                }

            }
        }

        /// <summary>
        /// 从指定主题订阅消息
        /// </summary>
        /// <param name="topics"></param>
        /// <param name="messageFunc"></param>
        /// <param name="cancellationToken"></param>
        /// <param name="groupId"></param>
        /// <returns></returns>
        public void SubscribeAsync(IEnumerable<string> topics, Action<string> messageFunc, CancellationToken cancellationToken, string groupId)
        {
            var config = new ConsumerConfig
            {
                BootstrapServers = options.KafkaConfig.BootstrapServers,
                GroupId = groupId,

                EnableAutoCommit = false,
                StatisticsIntervalMs = 5000,
                SecurityProtocol = SecurityProtocol.SaslPlaintext,
                SaslMechanism = SaslMechanism.ScramSha256,
                SaslUsername = options.KafkaConfig.SaslUsername,
                SaslPassword = options.KafkaConfig.SaslPassword,
                AutoOffsetReset = AutoOffsetReset.Earliest,
                EnablePartitionEof = true,

            };

            //提交偏移量的时候,也可以批量去提交
            const int commitPeriod = 1;
            using (var consumer = new ConsumerBuilder<Ignore, string>(config)
                .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}") )
                .SetStatisticsHandler((_, json) =>
                {
                    Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > Kafka消息监听中..");
                })
                .SetPartitionsAssignedHandler((c, partitions) =>
                {
                    string partitionsStr = string.Join(", ", partitions);
                    Console.WriteLine($" - 分配的 kafka 分区: [{string.Join(", ", partitions)}]");
                })
                .SetPartitionsRevokedHandler((c, partitions) =>
                {
                    //自定义存储偏移量
                    //1.每次消费完成,把相应的分区id和offset写入到mysql数据库存储
                    //2.从指定分区和偏移量开始拉取数据
                    //分配的时候调用

                    Console.WriteLine($" - 回收了 kafka 的分区:[{string.Join(", ", partitions)}]");
                })
                //.SetValueDeserializer(new CustomStringIDeserializer<T>())
                .Build())
            {
                consumer.Subscribe(topics);
                try
                {
                    while (true)
                    {
                        try
                        {
                            var consumeResult = consumer.Consume(cancellationToken);

                            if (consumeResult.IsPartitionEOF)
                            {
                                Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
                                continue;
                            }

                            Console.WriteLine($"Kafka接收到消息 at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");
                            string messageResult = consumeResult.Message.Value;
                            if (!string.IsNullOrEmpty(messageResult) /*&& consumeResult.Offset % commitPeriod == 0*/)
                            {
                                messageFunc(messageResult);
                                try
                                {
                                    consumer.Commit(consumeResult);
                                }
                                catch (KafkaException e)
                                {
                                    Log.Error($"Commit error: {e.Error.Reason}");
                                }
                            }
                        }
                        catch (ConsumeException e)
                        {
                            Log.Error($"Consume error: {e.Error.Reason}");
                        }
                    }
                }
                catch (OperationCanceledException ex)
                {
                    Log.Error($"Closing consumer.{ex.Message}");
                    consumer.Close();
                }

            }
        }

    }
}

4、使用 kafka

  /// <summary>
  /// 用户消息订阅(主题:MSG_UCENTER_PUBLISH_ZYYD)
  /// </summary>
  /// <param name="o"></param>
  public void MsgUserSubscribe(object? o)
  {
      var cts = new CancellationTokenSource();
      Console.CancelKeyPress += (_, e) =>
      {
          e.Cancel = true;
          cts.Cancel();
      };

      _kafkaServices.SubscribeAsync(new List<string>() { EConstant.KafkaTopicName.USER }, async (eventData) =>
      {
          //Console.WriteLine($" - {eventData}】- > 已处理");
          try
          {
              var messageEntity = new KafkaMessage()
              {
                  MType = 1,
                  AcType = 1,
                  TopicName = EConstant.KafkaTopicName.USER,
                  Content = eventData,
                  Status = 0
              };

              var id = kafkaMessageRepository.AddReturnIdentity(messageEntity);
              messageEntity.Id = id;

              JObject jobject = JObject.Parse(eventData);
          }
          catch (Exception e)
          {
              Log.Error($"failed to deliver message: {e.Message}");
          }
          

      }, cts.Token, EConstant.KafkaGroupId.USER);

  }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/951340.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Jenkins内修改allure报告名称

背景&#xff1a; 最近使用Jenkins搭建自动化测试环境时&#xff0c;使用Jenkins的allure插件生成的报告&#xff0c;一直显示默认ALLURE REPORT&#xff0c;想自定义成与项目关联的名称&#xff0c;如图所示&#xff0c;很明显自定义名称显得高大上些&#xff0c;之前…

Elasticsearch学习(1) : 简介、索引库操作、文档操作、RestAPI、RestClient操作

目录 1.elasticsearch简介1.1.了解es1.2.倒排索引正向索引和倒排索引 1.3.es的一些概念:文档和字段&#xff1b;索引和映射&#xff1b;Mysql与ES1.4.安装es、kibana部署单点es部署kibanaIK分词器安装IK分词器与测试扩展与停用词词典总结 部署es集群 2.索引库操作2.1.mapping映…

【Linux】Linux常见指令(上)

个人主页~ 初识Linux 一、Linux基本命令1、ls指令2、pwd命令3、cd指令4、touch指令5、mkdir指令6、rmdir指令7、rm指令8、man指令9、cp指令10、mv命令 Linux是一个开源的、稳定的、安全的、灵活的操作系统&#xff0c;Linux下的操作都是通过指令来实现的 一、Linux基本命令 先…

【Java项目】基于SpringBoot的【校园交友系统】

【Java项目】基于SpringBoot的【校园交友系统】 技术简介&#xff1a;系统软件架构选择B/S模式、SpringBoot框架、java技术和MySQL数据库等&#xff0c;总体功能模块运用自顶向下的分层思想。 系统简介&#xff1a;系统主要包括管理员和用户。 (a) 管理员的功能主要有首页、个人…

点击底部的 tabBar 属于 wx.switchTab 跳转方式,目标页面的 onLoad 不会触发(除非是第一次加载)

文章目录 1. tabBar 的跳转方式2. tabBar 跳转的特点3. 你的配置分析4. 生命周期触发情况5. 总结 很多人不明白什么是第一次加载&#xff0c;两种情况讨论&#xff0c;第一种情况假设我是开发者&#xff0c;第一次加载就是指点击微信开发者工具上边的编译按钮&#xff0c;每点击…

什么是Kafka?有什么主要用途?

大家好&#xff0c;我是锋哥。今天分享关于【什么是Kafka&#xff1f;有什么主要用途&#xff1f;】面试题。希望对大家有帮助&#xff1b; 什么是Kafka&#xff1f;有什么主要用途&#xff1f; 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 Kafka 是一个分布式流…

基于QT和C++的实时日期和时间显示

一、显示在右下角 1、timer.cpp #include "timer.h" #include "ui_timer.h" #include <QStatusBar> #include <QDateTime> #include <QMenuBar> Timer::Timer(QWidget *parent) :QMainWindow(parent),ui(new Ui::Timer) {ui->setup…

单片机-定时器中断

1、相关知识 振荡周期1/12us; //振荡周期又称 S周期或时钟周期&#xff08;晶振周期或外加振荡周期&#xff09;。 状态周期1/6us; 机器周期1us; 指令周期1~4us; ①51单片机有两组定时器/计数器&#xff0c;因为既可以定时&#xff0c;又可以计数&#xff0c;故称之为定时器…

Java 如何传参xml调用接口获取数据

传参和返参的效果图如下&#xff1a; 传参&#xff1a; 返参&#xff1a; 代码实现&#xff1a; 1、最外层类 /*** 外层DATA类*/ XmlRootElement(name "DATA") public class PointsXmlData {private int rltFlag;private int failType;private String failMemo;p…

【C】编译与链接

在本文章里面&#xff0c;我们讲会讲解C语言程序是如何从我们写的代码一步步变成计算机可以执行的二进制指令&#xff0c;并最终执行的。C语言程序运行主要包括两大步骤 -- 编译和链接&#xff0c;接下来我们就来一一讲解。 目录 1 翻译环境和运行环境 2 翻译环境 1&#…

如何设计一个注册中心?以Zookeeper为例

这是小卷对分布式系统架构学习的第8篇文章&#xff0c;在写第2篇文章已经讲过服务发现了&#xff0c;现在就从组件工作原理入手&#xff0c;讲讲注册中心 以下是面试题&#xff1a; 某团面试官&#xff1a;你来说说怎么设计一个注册中心&#xff1f; 我&#xff1a;注册中心嘛&…

Vision Transformer模型详解(附pytorch实现)

写在前面 最近&#xff0c;我在学习Transformer模型在图像领域的应用。图像处理任务一直以来都是深度学习领域的重要研究方向&#xff0c;而传统的卷积神经网络已在许多任务中取得了显著的成绩。然而&#xff0c;近年来&#xff0c;Transformer模型由于其在自然语言处理中的成…

vue实现虚拟列表滚动

<template> <div class"cont"> //box 视图区域Y轴滚动 滚动的是box盒子 滚动条显示的也是因为box<div class"box">//itemBox。 一个空白的盒子 计算高度为所有数据的高度 固定每一条数据高度为50px<div class"itemBox" :st…

Vue指令(下)

Vue指令(下) 参考文献&#xff1a; Vue的快速上手 Vue指令上 文章目录 Vue指令(下)v-bindv-bind小案例v-forv-for小案例v-for中的keyv-model 结语 博客主页: He guolin-CSDN博客 关注我一起学习&#xff0c;一起进步&#xff0c;一起探索编程的无限可能吧&#xff01;让我们…

初学者关于对机器学习的理解

一、机器学习&#xff1a; 1、概念&#xff1a;是指从有限的观测数据中学习(或“猜 测”)出具有一般性的规律&#xff0c;并利用这些规律对未知数据进行预测的方法.机器学 习是人工智能的一个重要分支&#xff0c;并逐渐成为推动人工智能发展的关键因素。 2、使用机器学习模型…

Vue篇-05

5 vuex 5.1 vuex是什么 概念:专门在 Vue 中实现集中式状态(数据)管理的一个Vue 插件&#xff0c;对 vue 应用中多个组件的共享状态进行集中式的管理(读/写)&#xff0c;也是一种组件间通信的方式&#xff0c;且适用于任意组件间通信。Github 地址: https://github.com/vuejs/…

Vue3(elementPlus) el-table替换/隐藏行箭头,点击整行展开

element文档链接&#xff1a; https://element-plus.org/zh-CN/component/form.html 一、el-table表格行展开关闭箭头替换成加减号 注&#xff1a;Vue3在样式中修改箭头图标无效&#xff0c;可能我设置不对&#xff0c;欢迎各位来交流指导 转变思路&#xff1a;隐藏箭头&…

opencv的NLM去噪算法

NLM&#xff08;Non-Local Means&#xff09;去噪算法是一种基于图像块&#xff08;patch&#xff09;相似性的去噪方法。其基本原理是&#xff1a; 图像块相似性&#xff1a;算法首先定义了一个搜索窗口&#xff08;search window&#xff09;&#xff0c;然后在该窗口内寻找…

NineData云原生智能数据管理平台新功能发布|2024年12月版

本月发布 7 项更新&#xff0c;其中重点发布 2 项、功能优化 5 项。 重点发布 数据库 Devops - Oracle 非表对象支持可视化创建与管理 Oracle 非表对象&#xff0c;包括视图&#xff08;View&#xff09;、包&#xff08;Package&#xff09;、存储过程&#xff08;Procedur…

计算机网络 —— 网络编程(TCP)

计算机网络 —— 网络编程&#xff08;TCP&#xff09; TCP和UDP的区别TCP (Transmission Control Protocol)UDP (User Datagram Protocol) 前期准备listen &#xff08;服务端&#xff09;函数原型返回值使用示例注意事项 accpect &#xff08;服务端&#xff09;函数原型返回…