RabbitMQ深度探索:简单实现 MQ

基于多线程队列实现 MQ :

  1. 实现类:
    public class ThreadMQ {
        private static LinkedBlockingDeque<JSONObject> broker = new LinkedBlockingDeque<JSONObject>();
    
        public static void main(String[] args) {
            //创建生产者线程
            Thread producer = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true){
                        try {
                            Thread.sleep(1000);
                            JSONObject data = new JSONObject();
                            data.put("phone","11111111");
                            broker.offer(data);
                        }catch (Exception e){
    
                        }
                    }
                }
            },"生产者");
    
            producer.start();
            Thread consumer = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true){
                        try {
                            JSONObject data = broker.poll();
                            if(data != null){
                                System.out.println(Thread.currentThread().getName() + data.toJSONString());
                            }
                        }catch (Exception e){
    
                        }
                    }
                }
            },"消费者");
            consumer.start();
    
        }
    }

基于 netty 实现 MQ:

  1. 执行过程:
    1. 消费者 netty 客户端与 nettyServer 端 MQ 服务器保持长连接,MQ 服务器端保存消费者连接
    2. 生产者 netty 客户端发送请求给 nettyServer 端 MQ 服务器,MQ 服务器端再将消息内容发送给消费者
  2. 执行流程:
    1. 导入 Maven 依赖:
      <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>fastjson</artifactId>
          <version>1.2.62</version>
      </dependency>
      <dependency>
          <groupId>io.netty</groupId>
          <artifactId>netty-all</artifactId>
          <version>4.0.23.Final</version>
      </dependency>
      <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>fastjson</artifactId>
          <version>1.2.62</version>
      </dependency>
      <dependency>
          <groupId>org.apache.commons</groupId>
          <artifactId>commons-lang3</artifactId>
          <version>3.11</version>
      </dependency>
      <dependency>
          <groupId>com.rabbitmq</groupId>
          <artifactId>amqp-client</artifactId>
          <version>3.6.5</version>
      </dependency>
    2. 服务端:
      package com.qcby.springboot.MQ;
      
      import com.alibaba.fastjson.JSONObject;
      import io.netty.bootstrap.ServerBootstrap;
      import io.netty.buffer.ByteBuf;
      import io.netty.buffer.Unpooled;
      import io.netty.channel.*;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.channel.socket.nio.NioServerSocketChannel;
      import org.apache.commons.lang3.StringUtils;
      
      import java.io.UnsupportedEncodingException;
      import java.util.ArrayList;
      import java.util.concurrent.LinkedBlockingDeque;
      
      /**
       * @ClassName BoyatopMQServer2021
       * @Author
       * @Version V1.0
       **/
      public class BoyatopNettyMQServer {
          public void bind(int port) throws Exception {
              /**
               * Netty 抽象出两组线程池BossGroup和WorkerGroup
               * BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写。
               */
              EventLoopGroup bossGroup = new NioEventLoopGroup();
              EventLoopGroup workerGroup = new NioEventLoopGroup();
              ServerBootstrap bootstrap = new ServerBootstrap();
              try {
                  bootstrap.group(bossGroup, workerGroup)
                          // 设定NioServerSocketChannel 为服务器端
                          .channel(NioServerSocketChannel.class)
                          //BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,
                          //用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
                          .option(ChannelOption.SO_BACKLOG, 100)
                          // 服务器端监听数据回调Handler
                          .childHandler(new BoyatopNettyMQServer.ChildChannelHandler());
                  //绑定端口, 同步等待成功;
                  ChannelFuture future = bootstrap.bind(port).sync();
                  System.out.println("当前服务器端启动成功...");
                  //等待服务端监听端口关闭
                  future.channel().closeFuture().sync();
              } catch (Exception e) {
                  e.printStackTrace();
              } finally {
                  //优雅关闭 线程组
                  bossGroup.shutdownGracefully();
                  workerGroup.shutdownGracefully();
              }
          }
      
          private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
              @Override
              protected void initChannel(SocketChannel ch) throws Exception {
                  // 设置异步回调监听
                  ch.pipeline().addLast(new BoyatopNettyMQServer.MayiktServerHandler());
              }
          }
      
          public static void main(String[] args) throws Exception {
              int port = 9008;
              new BoyatopNettyMQServer().bind(port);
          }
      
          private static final String type_consumer = "consumer";
      
          private static final String type_producer = "producer";
          private static LinkedBlockingDeque<String> msgs = new LinkedBlockingDeque<>();
          private static ArrayList<ChannelHandlerContext> ctxs = new ArrayList<>();
      
          // 生产者投递消息的:topicName
          public class MayiktServerHandler extends SimpleChannelInboundHandler<Object> {
      
              /**
               * 服务器接收客户端请求
               *
               * @param ctx
               * @param data
               * @throws Exception
               */
              @Override
              protected void channelRead0(ChannelHandlerContext ctx, Object data)
                      throws Exception {
                  //ByteBuf buf=(ByteBuf)data;
                  //byte[] req = new byte[buf.readableBytes()];
                  //buf.readBytes(req);
                  //String body = new String(req, "UTF-8");
                  //System.out.println("body:"+body);
                  JSONObject clientMsg = getData(data);
                  String type = clientMsg.getString("type");
                  switch (type) {
                      case type_producer:
                          producer(clientMsg);
                          break;
                      case type_consumer:
                          consumer(ctx);
                          break;
                  }
              }
      
              private void consumer(ChannelHandlerContext ctx) {
                  // 保存消费者连接
                  ctxs.add(ctx);
                  // 主动拉取mq服务器端缓存中没有被消费的消息
                  String data = msgs.poll();
                  if (StringUtils.isEmpty(data)) {
                      return;
                  }
                  // 将该消息发送给消费者
                  byte[] req = data.getBytes();
                  ByteBuf firstMSG = Unpooled.buffer(req.length);
                  firstMSG.writeBytes(req);
                  ctx.writeAndFlush(firstMSG);
              }
      
              private void producer(JSONObject clientMsg) {
                  // 缓存生产者投递 消息
                  String msg = clientMsg.getString("msg");
                  msgs.offer(msg); //保证消息不丢失还可以缓存硬盘
      
                  //需要将该消息推送消费者
                  ctxs.forEach((ctx) -> {
                      // 将该消息发送给消费者
                      String data = msgs.poll();
                      if (data == null) {
                          return;
                      }
                      byte[] req = data.getBytes();
                      ByteBuf firstMSG = Unpooled.buffer(req.length);
                      firstMSG.writeBytes(req);
                      ctx.writeAndFlush(firstMSG);
                  });
              }
      
              private JSONObject getData(Object data) throws UnsupportedEncodingException {
                  ByteBuf buf = (ByteBuf) data;
                  byte[] req = new byte[buf.readableBytes()];
                  buf.readBytes(req);
                  String body = new String(req, "UTF-8");
                  return JSONObject.parseObject(body);
              }
      
      
              @Override
              public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                  ctx.flush();
              }
      
              @Override
              public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                      throws Exception {
      
                  ctx.close();
              }
          }
      }
    3. 生产端:
      package com.qcby.springboot.MQ;
      
      import com.alibaba.fastjson.JSONObject;
      import io.netty.bootstrap.Bootstrap;
      import io.netty.buffer.ByteBuf;
      import io.netty.buffer.Unpooled;
      import io.netty.channel.*;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.channel.socket.nio.NioSocketChannel;
      
      /**
       * @ClassName BoyatopNettyMQProducer
       * @Author
       * @Version V1.0
       **/
      public class BoyatopNettyMQProducer {
          public void connect(int port, String host) throws Exception {
              //配置客户端NIO 线程组
              EventLoopGroup group = new NioEventLoopGroup();
              Bootstrap client = new Bootstrap();
              try {
                  client.group(group)
                          // 设置为Netty客户端
                          .channel(NioSocketChannel.class)
                          /**
                           * ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。
                           * Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。
                           * 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。
                           */
                          .option(ChannelOption.TCP_NODELAY, true)
                          .handler(new ChannelInitializer<SocketChannel>() {
                              @Override
                              protected void initChannel(SocketChannel ch) throws Exception {
                                  ch.pipeline().addLast(new BoyatopNettyMQProducer.NettyClientHandler());
                                  1. 演示LineBasedFrameDecoder编码器
      //                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
      //                            ch.pipeline().addLast(new StringDecoder());
                              }
                          });
      
                  //绑定端口, 异步连接操作
                  ChannelFuture future = client.connect(host, port).sync();
                  //等待客户端连接端口关闭
                  future.channel().closeFuture().sync();
              } finally {
                  //优雅关闭 线程组
                  group.shutdownGracefully();
              }
          }
      
          public static void main(String[] args) {
              int port = 9008;
              BoyatopNettyMQProducer client = new BoyatopNettyMQProducer();
              try {
                  client.connect(port, "127.0.0.1");
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      
          public class NettyClientHandler extends ChannelInboundHandlerAdapter {
      
      
              @Override
              public void channelActive(ChannelHandlerContext ctx) throws Exception {
                  JSONObject data = new JSONObject();
                  data.put("type", "producer");
                  JSONObject msg = new JSONObject();
                  msg.put("userId", "123456");
                  msg.put("age", "23");
                  data.put("msg", msg);
                  // 生产发送数据
                  byte[] req = data.toJSONString().getBytes();
                  ByteBuf firstMSG = Unpooled.buffer(req.length);
                  firstMSG.writeBytes(req);
                  ctx.writeAndFlush(firstMSG);
              }
      
              /**
               * 客户端读取到服务器端数据
               *
               * @param ctx
               * @param msg
               * @throws Exception
               */
              @Override
              public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                  ByteBuf buf = (ByteBuf) msg;
                  byte[] req = new byte[buf.readableBytes()];
                  buf.readBytes(req);
                  String body = new String(req, "UTF-8");
                  System.out.println("客户端接收到服务器端请求:" + body);
              }
      
              // tcp属于双向传输
      
              @Override
              public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                  ctx.close();
              }
          }
      }
    4. 客户端:
      package com.qcby.springboot.MQ;
      
      import com.alibaba.fastjson.JSONObject;
      import io.netty.bootstrap.Bootstrap;
      import io.netty.buffer.ByteBuf;
      import io.netty.buffer.Unpooled;
      import io.netty.channel.*;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.channel.socket.nio.NioSocketChannel;
      
      /**
       * @ClassName BoyatopNettyMQProducer
       * @Author
       * @Version V1.0
       **/
      public class NettyMQConsumer {
          public void connect(int port, String host) throws Exception {
              //配置客户端NIO 线程组
              EventLoopGroup group = new NioEventLoopGroup();
              Bootstrap client = new Bootstrap();
              try {
                  client.group(group)
                          // 设置为Netty客户端
                          .channel(NioSocketChannel.class)
                          /**
                           * ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。
                           * Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。
                           * 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。
                           */
                          .option(ChannelOption.TCP_NODELAY, true)
                          .handler(new ChannelInitializer<SocketChannel>() {
                              @Override
                              protected void initChannel(SocketChannel ch) throws Exception {
                                  ch.pipeline().addLast(new NettyMQConsumer.NettyClientHandler());
                                  1. 演示LineBasedFrameDecoder编码器
      //                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
      //                            ch.pipeline().addLast(new StringDecoder());
                              }
                          });
      
                  //绑定端口, 异步连接操作
                  ChannelFuture future = client.connect(host, port).sync();
                  //等待客户端连接端口关闭
                  future.channel().closeFuture().sync();
              } finally {
                  //优雅关闭 线程组
                  group.shutdownGracefully();
              }
          }
      
          public static void main(String[] args) {
              int port = 9008;
              NettyMQConsumer client = new NettyMQConsumer();
              try {
                  client.connect(port, "127.0.0.1");
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      
          public class NettyClientHandler extends ChannelInboundHandlerAdapter {
      
      
              @Override
              public void channelActive(ChannelHandlerContext ctx) throws Exception {
                  JSONObject data = new JSONObject();
                  data.put("type", "consumer");
                  // 生产发送数据
                  byte[] req = data.toJSONString().getBytes();
                  ByteBuf firstMSG = Unpooled.buffer(req.length);
                  firstMSG.writeBytes(req);
                  ctx.writeAndFlush(firstMSG);
              }
      
              /**
               * 客户端读取到服务器端数据
               *
               * @param ctx
               * @param msg
               * @throws Exception
               */
              @Override
              public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                  ByteBuf buf = (ByteBuf) msg;
                  byte[] req = new byte[buf.readableBytes()];
                  buf.readBytes(req);
                  String body = new String(req, "UTF-8");
                  System.out.println("客户端接收到服务器端请求:" + body);
              }
      
              // tcp属于双向传输
      
              @Override
              public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                  ctx.close();
              }
          }
      }
  3. 持久化机制:
    1. 如果 MQ 接收到生产者投递信息,如果消费者不存在的情况下,消息是否会丢失?
    2. 答:不会丢失,消息确认机制必须要消费者消费成功之后,在通知给 MQ 服务器端,删除该消息
  4. MQ 服务器将该消息推送给消费者:
    1. 消费者已经和 MQ 服务器保持长连接
    2. 消费者在第一次启动的时候会主动拉取信息
  5. MQ 如何实现高并发思想:
    1. MQ 消费者根据自身能力情况,拉取 MQ 服务器端消费消息
    2. 默认的情况下取出一条消息
  6. 缺点:
    1. 存在延迟问题
  7. 需要考虑 MQ 消费者提高速率的问题:
    1. 如何提高消费者速率:消费者实现集群、消费者批量获取消息即可

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/964878.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

设备通过国标GB28181接入EasyCVR,显示在线但视频无法播放的原因排查

安防监控EasyCVR平台支持多种视频源接入&#xff0c;包括但不限于IP摄像头、NVR、编码器、流媒体服务器等。平台采用高效的视频流接入技术&#xff0c;支持大规模视频流的并发接入&#xff0c;确保视频流的稳定性和流畅性。 有用户反馈&#xff0c;项目现场使用国标GB28181接入…

Electricity Market Optimization 探索系列(三)

本文参考链接link 电网容量规划是一个寻求最优发电容量的过程&#xff0c;找到的最优发电容量能够可靠地满足未来电网的需求 发电机的容量和发电成本呈正相关关系&#xff0c;一台发电机的发电量不能超过其额定发电容量&#xff0c;结合我之前的博客所说的内容&#xff0c;可…

深入理解和使用定时线程池ScheduledThreadPoolExecutor

文章目录 前言认识定时线程池什么是定时线程池&#xff1f;定时线程池基本API使用定时线程池的应用场景1、定时任务调度2、缓存过期清理3、心跳检测4、延迟任务执行 定时线程池scheduleAtFixedRate与scheduleWithFixedDelay区别scheduleAtFixedRate案例demo&#xff08;period&…

在Mac mini M4上部署DeepSeek R1本地大模型

在Mac mini M4上部署DeepSeek R1本地大模型 安装ollama 本地部署&#xff0c;我们可以通过Ollama来进行安装 Ollama 官方版&#xff1a;【点击前往】 Web UI 控制端【点击安装】 如何在MacOS上更换Ollama的模型位置 默认安装时&#xff0c;OLLAMA_MODELS 位置在"~/.o…

动态规划练习九(完全背包问题)

一、问题介绍与解题心得 完全背包问题与01背包问题很相似&#xff0c;不同点就是每个物品数量有多个&#xff0c;每个物品可以取多个或不取&#xff0c;来达到收益最大&#xff0c;或者收益在某个值。 限制条件&#xff1a;背包容量有限 解决问题&#xff1a;从价值入手&…

百亿大表的实时分析:华安基金 HTAP 数据库的选型历程与 TiDB 使用体验

导读 在金融科技迅猛发展的今天&#xff0c;华安基金作为行业的先行者&#xff0c;面临着数据管理和分析的全新挑战。随着业务的不断扩展和数据量的激增&#xff0c;传统的数据库架构已难以满足系统对实时性、灵活性和分析能力的需求。在这样的背景下&#xff0c;HTAP&#xf…

低代码系统-产品架构案例介绍、蓝凌(十三)

蓝凌低代码系统&#xff0c;依旧是从下到上&#xff0c;从左至右的顺序。 技术平台h/iPaas 指低层使用了哪些技术&#xff0c;例如&#xff1a;微服务架构&#xff0c;MySql数据库。个人认为&#xff0c;如果是市场的主流&#xff0c;就没必要赘述了。 新一代门户 门户设计器&a…

DeepSeek研究员在线爆料:R1训练仅用两到三周,春节期间观察到R1 zero强大进化

内容提要 刚刚我注意到DeepSeek研究员Daya Guo回复了网友有关DeepSeek R1的一些问题&#xff0c;以及接下来的公司的计划&#xff0c;只能说DeepSeek的R1仅仅只是开始&#xff0c;内部研究还在快速推进&#xff0c;DeepSeek 的研究员过年都没歇&#xff0c;一直在爆肝推进研究…

【Rust自学】20.1. 最后的项目:单线程Web服务器

喜欢的话别忘了点赞、收藏加关注哦&#xff08;加关注即可阅读全文&#xff09;&#xff0c;对接下来的教程有兴趣的可以关注专栏。谢谢喵&#xff01;(&#xff65;ω&#xff65;) 20.1.1. 什么是TCP和HTTP Web 服务器涉及的两个主要协议是超文本传输​​协议(Hypertext T…

19.[前端开发]Day19-王者荣项目耀实战(二)

01_(掌握)王者荣耀-main-banner展示实现 完整代码 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewpor…

Java 基于微信小程序的高校失物招领平台小程序(附源码,文档)

博主介绍&#xff1a;✌程序员徐师兄、8年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战*✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447…

题解:洛谷 P5837 [USACO19DEC] Milk Pumping G

题目https://www.luogu.com.cn/problem/P5837 温馨提示&#xff1a;鉴于数据范围小的可怜&#xff0c;我们可以用暴力一些的想法去做&#xff0c;别看到是普及/提高就被吓退了。 枚举最小流量 &#xff0c;然后跑一遍最短路&#xff0c;求出带限制的 到 的最短路的长度&#…

动态规划——斐波那契数列模型问题

文章目录 1137. 第 N 个泰波那契数算法原理代码实现 面试题 08.01. 三步问题算法原理代码实现 746. 使用最小花费爬楼梯算法原理代码实现 91. 解码方法算法原理代码实现 1137. 第 N 个泰波那契数 题目链接&#xff1a;1137. 第 N 个泰波那契数 算法原理 状态表示&#xff1a;…

LabVIEW涡轮诊断系统

一、项目背景与行业痛点 涡轮机械是发电厂、航空发动机、石油化工等领域的核心动力设备&#xff0c;其运行状态直接关系到生产安全与经济效益。据统计&#xff0c;涡轮故障导致的非计划停机可造成每小时数十万元的经济损失&#xff0c;且突发故障可能引发严重安全事故。传统人…

java程序员面试自身优缺点,详细说明

程序员面试大厂经常被问到的Java异常机制问题,你搞懂了吗运行时异常:运行时异常是可能被程序员避免的异常。与检查性相反,运行时异常可以在编译时被忽略。错误(ERROR):错误不是异常,而是脱离程序员控制的问题。错误通常在代码中容易被忽略。例如:当栈溢出时,一个错误就发生了,它…

大话特征工程:3.特征扩展

公元 2147 年&#xff0c;人类文明站在科技的巅峰&#xff0c;所有决策、发展甚至感知都被“全维计算网络”所掌控。这套系统以高维空间中的数据为基础&#xff0c;试图预测并塑造未来。然而&#xff0c;这场辉煌的技术革命却在悄无声息之间酿成了人类最大的危机——维数灾难。…

CSV数据分析智能工具(基于OpenAI API和streamlit)

utils.py&#xff1a; from langchain_openai import ChatOpenAI from langchain_experimental.agents.agent_toolkits import create_csv_agent import jsonPROMPT_TEMPLATE """你是一位数据分析助手&#xff0c;你的回应内容取决于用户的请求内容。1. 对于文…

2025.2.5

Web [SWPUCTF 2021 新生赛]ez_unserialize: 这个题先了解一下反序列化&#xff1a;反序列化是序列化的逆过程。序列化是将对象或数据结构转换为可以存储或传输的格式&#xff08;如JSON、XML或二进制格式&#xff09;的过程。反序列化则是将这个格式的数据转换回原始的对象或…

新版AndroidStudio 修改 jdk版本

一、问题 之前&#xff0c;在安卓项目中配置JDK和Gradle的过程非常直观&#xff0c;只需要进入Android Studio的File菜单中的Project Structure即可进行设置&#xff0c;十分方便。 如下图可以在这修改JDK: 但是升级AndroidStudio之后&#xff0c;比如我升级到了Android Stu…

Web3技术详解

Web3技术代表着互联网技术的最新进展&#xff0c;它致力于打造一个去中心化的互联网生态系统。以下是对Web3技术的详细解析&#xff1a; 一、Web3技术的核心概念 Web3是第三代互联网技术的代名词&#xff0c;代表着去中心化、区块链驱动和用户自有控制的理念。在Web3的世界中…