rabbitmq 基本总结

rabbitmq 的基本概念 vhost、broker、producer、 consumer、 exchange、 queue、 routing key

rabbitmq 常用的队列类型,工作队列(简单队列),pub/sub, routing key, topic 模式

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.16.0</version>
</dependency>
public class RabbitmqClientDemo {
    private static ConnectionFactory factory = new ConnectionFactory();

    private static String EXCHANGE_NAME = "exchange.fanout";
    private static String FANOUT_QUEUE = "queue.fanout";
    private static String DIRECT_EXCHANGE = "exchange_direct";
    private static String QUEUE_DIRCT = "queue.direct.02";
    private static String QUEUE_TOPIC_ONE = "queue.topic.01";
    private static String QUEUE_TOPIC_TWO = "queue.topic.02";
    private static String QUEUE_TOPIC_THREE = "queue.topic.03";
    private static String ROUNTING_KEY_ONE = "routing.key.01";
    private static String ROUNTING_KEY_TWO = "routing.key.02";
    private static String ROUNTING_KEY_THREE = "routing.key.03";
    private static String DEAD_MESSAGE_EXCHANGE = "EXCHANGE_DEAD";
    private static String DEAD_QUEUE = "queue.dead";

    static {

        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
    }

    public static Connection getConnection() throws IOException, TimeoutException {
        return factory.newConnection();
    }

    public static Channel createChannel() throws IOException, TimeoutException {
        Connection connection = getConnection();
        return connection.createChannel();
    }

    public static void main(String[] args) {
        //new WorkQueueProducer().start();
        //new WorkerConsumer().start();
      /*  new PublishConsumer().start();
        new PublishProducer().start();*/
        //new TopicProducer().start();
        //new TopicConsumer().start();
        new DeadMessageProducer().start();
        new DeadMessageConsumer().start();
    }

    static class WorkQueueProducer extends Thread {
        @Override
        public void run() {
            try {
                Connection connection = getConnection();
                Channel channel = connection.createChannel();
                channel = connection.createChannel();
                channel.queueDeclare("hello", false, false, false, null);
                channel.basicPublish("", "hello", null, "hello".getBytes());
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    //hannel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class WorkerConsumer extends Thread {

        @Override
        public void run() {
            try {
                Connection connection = getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare("hello", false, false, false, null);
                channel.basicQos(1);
                channel.basicConsume("hello", true, new DeliverCallback() {
                    @Override
                    public void handle(String s, Delivery delivery) throws IOException {
                        System.out.println(new String(delivery.getBody()));
                    }
                }, consumerTag -> {
                });
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    //channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class PublishProducer extends Thread {
        @Override
        public void run() {
            try {
                Channel channel = createChannel();
                channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
                channel.queueDeclare(FANOUT_QUEUE, true, false, false, null);
                channel.queueBind(FANOUT_QUEUE, EXCHANGE_NAME, "");
                for (int i = 1; i <= 40; i++) {
                    String message = String.format("current orderId is %d, money is %d", UUID.randomUUID(), new Random().nextDouble());
                    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    //channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class PublishConsumer extends Thread {

        @Override
        public void run() {
            try {
                Channel channel = createChannel();
                channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
                channel.queueDeclare(FANOUT_QUEUE, true, false, false, null);
                channel.queueBind(FANOUT_QUEUE, EXCHANGE_NAME, "");
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    System.out.println(new String(delivery.getBody(), StandardCharsets.UTF_8));
                };
                while (true) {
                    channel.basicConsume(FANOUT_QUEUE, true, deliverCallback, consumerTag -> {
                    });
                }

            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    //channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class TopicProducer extends Thread {
        @Override
        public void run() {
            try {
                Channel channel = createChannel();
                channel.exchangeDeclare(DIRECT_EXCHANGE, "topic");
                channel.queueDeclare(QUEUE_TOPIC_ONE, true, false, false, null);
                channel.queueDeclare(QUEUE_TOPIC_TWO, true, false, false, null);
                channel.queueDeclare(QUEUE_TOPIC_THREE, true, false, false, null);
                channel.queueBind(QUEUE_TOPIC_ONE, DIRECT_EXCHANGE, ROUNTING_KEY_ONE);
                channel.queueBind(QUEUE_TOPIC_TWO, DIRECT_EXCHANGE, ROUNTING_KEY_TWO);
                channel.queueBind(QUEUE_TOPIC_TWO, DIRECT_EXCHANGE, ROUNTING_KEY_TWO);
                channel.queueBind(QUEUE_TOPIC_THREE, DIRECT_EXCHANGE, ROUNTING_KEY_THREE);
                channel.queueBind(QUEUE_TOPIC_THREE, DIRECT_EXCHANGE, ROUNTING_KEY_ONE);
                channel.queueBind(QUEUE_TOPIC_THREE, DIRECT_EXCHANGE, ROUNTING_KEY_TWO);
                for (int i = 1; i <= 10; i++) {
                    String message = String.format("current orderId is %s, money is %s", UUID.randomUUID().toString(), new Random().nextDouble());
                    if (i % 3 == 0) {
                        System.out.println("send to topic1");
                        channel.basicPublish(DIRECT_EXCHANGE, ROUNTING_KEY_ONE, null, message.getBytes(StandardCharsets.UTF_8));
                    } else if (i % 3 == 1) {
                        System.out.println("send to topic2");
                        channel.basicPublish(DIRECT_EXCHANGE, ROUNTING_KEY_TWO, null, message.getBytes(StandardCharsets.UTF_8));
                    } else {
                        System.out.println("send to topic3");
                        channel.basicPublish(DIRECT_EXCHANGE, ROUNTING_KEY_THREE, null, message.getBytes(StandardCharsets.UTF_8));
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    static class TopicConsumer extends Thread {

        @Override
        public void run() {
            try {
                Channel channel = createChannel();
                channel.exchangeDeclare(DIRECT_EXCHANGE, "topic");
                channel.queueDeclare(QUEUE_TOPIC_THREE, true, false, false, null);
                channel.queueBind(QUEUE_TOPIC_THREE,EXCHANGE_NAME,"routing.key.*")
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    System.out.println(delivery.getEnvelope().getRoutingKey());
                    System.out.println(new String(delivery.getBody(), StandardCharsets.UTF_8));
                };
                while (true) {
                    channel.basicConsume(QUEUE_TOPIC_THREE, true, deliverCallback, consumerTag -> {
                    });
                }

            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    //channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    static class DeadMessageProducer extends Thread {
        @Override
        public void run() {
            try {
                Channel channel = createChannel();
                channel.exchangeDeclare(DIRECT_EXCHANGE, "direct");
                channel.queueDeclare(DEAD_QUEUE, true, false, false, null);
                channel.queueBind(DEAD_QUEUE, DEAD_MESSAGE_EXCHANGE, "routing.direct02");
                for (int i = 1; i <= 40; i++) {
                    String message = String.format("current orderId is %s, money is %s", UUID.randomUUID().toString(), new Random().nextDouble());
                    AMQP.BasicProperties prop = new AMQP.BasicProperties().builder().expiration("30000").build();
                    channel.basicPublish(DIRECT_EXCHANGE, "routing.direct02", prop, message.getBytes(StandardCharsets.UTF_8));
                    System.out.println("send to topic1");
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    static class DeadMessageConsumer extends Thread {

        @Override
        public void run() {
            try {
                Channel channel = createChannel();
                /*channel.exchangeDeclare(DEAD_MESSAGE_EXCHANGE, "direct");
                Map<String, Object> deadLetterParams = new HashMap<>(2);
                deadLetterParams.put("x-dead-letter-exchange", DEAD_MESSAGE_EXCHANGE);
                deadLetterParams.put("x-dead-letter-routing-key", "routing.dead02");
                deadLetterParams.put("x-max-length", 2);*/
                /*channel.queueDeclare(QUEUE_DIRCT, true, false, false, deadLetterParams);
                channel.queueBind(DEAD_QUEUE, DEAD_MESSAGE_EXCHANGE, "routing.dead02");*/
                channel.exchangeDeclare(DIRECT_EXCHANGE, "direct");
                channel.queueBind(QUEUE_DIRCT, DIRECT_EXCHANGE, "routing.direct02");
                DeliverCallback callback  = (consumerTag, delivery) -> {
                    System.out.println(delivery.getEnvelope().getRoutingKey());
                    System.out.println(new String(delivery.getBody(), StandardCharsets.UTF_8));
                };
               /* DeliverCallback callback = (consumerTag, delivery) -> {
                    String receivedMessage = new String(delivery.getBody());
                    System.out.println("C1接收到消息:" + receivedMessage + "并且拒绝签收了");
                    // 禁止重新入队
                    channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);

                };*/

                while (true) {
                    //channel.basicConsume(QUEUE_DIRCT, true, deliverCallback, consumerTag -> {});
                    channel.basicConsume(QUEUE_DIRCT, true, callback, (consumerTag) -> {
                        System.out.println(consumerTag + "消费者取消消费消息");
                    });
                }

            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    //channel.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

整合springboot

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>3.1.5</version>
</dependency>

rabbitmq 的核心配置(相比于其他的mq,rabbit 有图形用户界面,可以傻瓜操作)

https://blog.csdn.net/leesinbad/article/details/128670794

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/443030.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Excel 快速填充/输入内容

目录 一. Ctrl D/R 向下/右填充二. 批量输入内容 一. Ctrl D/R 向下/右填充 ⏹如下图所示&#xff0c;通过快捷键向下和向右填充数据 &#x1f914;当选中第一个单元格之后&#xff0c;可以按住Shift后&#xff0c;再选中最后一个单元格&#xff0c;可以选中第一个单元格和最…

云计算项目九:K8S安装

K8S安装 Kube-master安装 按照如下配置准备云主机 防火墙相关配置&#xff1a;禁用selinux&#xff0c;禁用swap&#xff0c;且在firewalld-*。上传kubernetes.zip 到跳板机 配置yum仓库&#xff08;跳板机&#xff09; 跳板机主机配置k8s软件源服务端 [rootjs ~]# yum -y…

uniapp踩坑之项目:uni.previewImage简易版预览单图片

主要使用uni.previewImage //html <view class"box-card" v-for"(item,index) in DataList" :key"index"><view>图片&#xff1a;</view><image :src"item.Path" tap.stop"clickImg(item.Path)">&l…

JavaScript数组方法常用方法大全

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 1. push()2. pop()3. unshift()4. shift()5. isArray()6. map()7. filter()8. every()9. some()10. splice()11. slice()12. indexOf()13. includes()14. concat()1…

Python和Google Colab进行卫星图像二维小波变化和机器学习

2D 小波分解是图像处理中的一种流行技术,使用不同的滤波器将图像分解为不同的频率分量(“近似”和“细节”系数)。该技术对于各种图像处理任务特别有用,例如压缩、去噪、特征提取和边缘检测。 在本文中,我们将演示如何在 Google Colab 中使用 Python 下载高分辨率样本卫星…

弄清Resource、ResourceLoader、容器之间的微妙关系

Resource&#xff1a; 在Java中&#xff0c;资源会被抽象成url&#xff0c;通过url前面的协议&#xff08;如file&#xff1a;&#xff0c;classpath&#xff1a;&#xff09;来处理不同的操作逻辑&#xff0c;resource是一个接口 Resource家族 具体的资源访问由其实现类完成&a…

Python学习日记之学习turtle库(下 篇)

前言&#xff1a; 书接上篇&#xff0c;我们继续来了解Python内置库 turtle功能库。在前面的文章中&#xff0c;我们初步的了解了一下 turtle库&#xff0c;画布和画笔相关的函数&#xff0c;那么我们继续来深入了解一下吧。 详情见&#xff1a; Python学习日记之学习turtle库…

Spring web MVC(入门)

1、什么是MVC&#xff08;一种思想&#xff09; Model View Controller &#xff1a; Model—模型 View—视图 Controller—控制器 2、Spring MVC是一种实现&#xff08;我们现在学的是Spring web,Spring mvc过时了&#xff09; View属于前端问题我们后端人员不必太过于关注…

Prometheus添加nginx节点显示不支持stub_status 解决办法

1、我们在使用Prometheus监控添加nginx节点监控的时候&#xff0c;在被监控节点的nginx配置文件中添加下面的模块 server { listen 80; server_name localhost; location /stub_status { stub_status on; access_log off; …

Leetcode 73. 矩阵置零

给定一个 m x n 的矩阵&#xff0c;如果一个元素为0&#xff0c;则将其所在行和列的所有元素都设为 0 。请使用原地算法。 示例 1&#xff1a; 输入&#xff1a;matrix [[1,1,1],[1,0,1],[1,1,1]] 输出&#xff1a;[[1,0,1],[0,0,0],[1,0,1]] 示例 2&#xff1a; 输入&a…

力扣hot100题解(python版63-68题)

63、搜索插入位置 给定一个排序数组和一个目标值&#xff0c;在数组中找到目标值&#xff0c;并返回其索引。如果目标值不存在于数组中&#xff0c;返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O(log n) 的算法。 示例 1: 输入: nums [1,3,5,6], target 5 输…

在Blender中清理由Instant-NGP等几何学习技术生成的网格

使用布尔运算: 创建一个大的立方体或其他简单几何体包裹住全部网格。使用布尔修改器对两个网格进行“差集”运算。这将移除超出包裹体之外的多余网格部分。 手动选择并删除: 进入编辑模式&#xff08;按Tab键&#xff09;。按A键取消选择所有顶点。按B键并拖动以选择您想要删除…

【论文笔记】Language Models are Few-Shot Learners

Language Models are Few-Shot Learners 本部分是 GPT-3 技术报告的第一部分&#xff1a;论文正文、部分附录。 后续还有第二部分&#xff1a;GPT-3 的广泛影响、剩下的附录。 以及第三部分&#xff08;自己感兴趣的&#xff09;&#xff1a;GPT-3 的数据集重叠性研究。 回顾…

vue2 div滚动条下拉到底部时触发事件(懒加载) 超级简易版本的懒加载

文章目录 导文文章重点内容效果展示&#xff1a;代码展示这些方法适用于哪些场景 总结 导文 vue2 div滚动条下拉到底部时触发事件(懒加载) 超级简易版本的懒加载 文章重点 内容效果展示&#xff1a; 当div拉到底部的时候&#xff1a; 编辑器返回&#xff1a; 代码展示 在…

来点基础的吧,JavaScript、JSP怎么打印输出,方便调试

这个对初学者肯定有用&#xff0c;自己写了代码&#xff0c;想看看对不对&#xff0c;想打印到页面上看看&#xff0c;都有哪些地方需要打印用哪些方法呢&#xff1f; 一、JavaScript的打印输出 1、console.log() console.log()是JavaScript中最常用的打印值方法之一。它将指…

React-router之简单使用

1.概念 说明&#xff1a;页面的跳转 2.安装 说明&#xff1a;路由采用CRA创建项目的方式进行基础环境配置。 npx create-react-app react-router-pro npm i react-router-dom 3.使用 import React from react; import ReactDOM from react-dom/client; import ./index.css;…

嵌入式学习第二十五天!(网络的概念、UDP编程)

网络&#xff1a; 可以用来&#xff1a;数据传输、数据共享 1. 网络协议模型&#xff1a; 1. OSI协议模型&#xff1a; 应用层实际收发的数据表示层发送的数据是否加密会话层是否建立会话连接传输层数据传输的方式&#xff08;数据包&#xff0c;流式&#xff09;网络层数据的…

C#学习:初识各类应用程序

编写我们第一个程序——Hello,World! 1.编程不是“学”出来的&#xff0c;而是“练”出来的 2.在反复应用中积累&#xff0c;忽然有一天就会顿悟 3.学习原则&#xff1a; 3.1从感官到原理 3.2从使用别人的到创建自己的 3.3必需亲自动手 3.4必需学以致用&#xff0c;紧跟实际…

大模型思维链(CoT prompting)

思维链&#xff08;Chain of Thought&#xff0c;CoT&#xff09; **CoT 提示过程是一种大模型提示方法&#xff0c;它鼓励大语言模型解释其推理过程。**思维链的主要思想是通过向大语言模型展示一些少量的 exapmles&#xff0c;在样例中解释推理过程&#xff0c;大语言模型在…

HTML 学习笔记(七)列表

html中的列表分为以下三种&#xff1a;有序列表&#xff0c;无序列表和自定义列表 1.有序列表 有序列表由两个元素组成&#xff1a;元素ol和元素li&#xff0c;此两个元素是父子关系&#xff0c;li必须包裹在ol里使用&#xff0c; ol里直接嵌套的只有li&#xff0c;其嵌套效果…