一.概述与安装
//RabbitMQ //1.核心部分-高级部分-集群部分 //2.什么是MQ 消息队列message queue 先入先出原则;消息通信服务 //3.MQ的大三功能 流量消峰 应用解耦 消息中间件 //(1)人-订单系统(1万次/S)—> 人 - MQ(流量消峰,对访问人员进行排队) -订单系统(保护系统不宕机) //(2)订单系统-支付/库存/物流系统—> 订单系统-MQ-支付/库存/物流系统 //(3)A -API -B —> A - MQ -B 这样可以通过MQ完成时告知A //4.MQ的分类 ActiveMQ(老) Kafka(大数据的杀手锏) RocketMQ RabbitMQ(中小型公司推荐) //5.RabbitMQ概念 //(1)就是一个快递站 发件人-快递员-快递站MQ-快递员-收件人 //(2)生产者-MQ(1交换机、N队列)-1个队列对应1个消费者 //6.核心部分 6大模式 //(1)简单模式Hello World //(2)工作模式Work queues //(3)发布订阅模式Publish/Subscribe //(4)路由模式Routing //(5)主体模式Topics //(6)发布确认模式Publisher Confirms //(7)Broker消息实体(可以有多个交换机Exchange(可以有多个队列Queue));Connection(多个Channel) // Producer生产者 Consumer消费者 Binding绑定,交换机和queue之间的虚拟连接 //7.RMQ的安装 //集中下载 链接:https://pan.baidu.com/s/1NJfYnLT4DN-uu-uyIXzA4w 提取码:HIT0 //(1)先安装erlang yum -y install gcc glibc-devel make ncurses-devel openssl-devel xmlto perl wget gtk2-devel binutils-devel //(2)下载 wget http://erlang.org/download/otp_src_22.0.tar.gz //(3)解压 tar -zxvf otp_src_22.0.tar.gz //(4)移动 mv otp_src_22.0 /usr/local/ //(5)切换目录 cd /usr/local/otp_src_22.0/ //(6)创建即将安装的目录 mkdir ../erlang //(7)配置安装路径 ./configure --prefix=/usr/local/erlang //(8)安装 make install //(9)查看一下是否安装成功 ll /usr/local/erlang/bin //(10)添加环境变量 echo 'export PATH=$PATH:/usr/local/erlang/bin' >> /etc/profile //(11)刷新环境变量 source /etc/profile //(12)甩一条命令 erl halt(). 退出 //---------------------安装RMQ------------------------- //(13)下载 wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.15/rabbitmq-server-generic-unix-3.7.15.tar.xz //(14)由于是tar.xz格式的所以需要用到xz,没有的话就先安装 yum install -y xz //(15)第一次解压 /bin/xz -d rabbitmq-server-generic-unix-3.7.15.tar.xz //(16)第二次解压 tar -xvf rabbitmq-server-generic-unix-3.7.15.tar //(17)移动 mv rabbitmq_server-3.7.15/ /usr/local/ //(18)改名 mv /usr/local/rabbitmq_server-3.7.15 /usr/local/rabbitmq //(19)配置环境变量 echo 'export PATH=$PATH:/usr/local/rabbitmq/sbin' >> /etc/profile //(20)刷新环境变量 source /etc/profile //(21)创建配置目录 mkdir /etc/rabbitmq //(22)启动:rabbitmq-server -detached //(23)停止:rabbitmqctl stop //(24)状态:rabbitmqctl status //(25)开放端口 firewall-cmd --zone=public --add-port=15672/tcp --permanent firewall-cmd --zone=public --add-port=5672/tcp --permanent //(26)开启web插件 rabbitmq-plugins enable rabbitmq_management //(27)访问 http://wdfgdzx.top:15672/ 默认账号密码:guest guest(这个账号只允许本机访问) //(28)查看所有用户 rabbitmqctl list_users //(29)添加一个用户 rabbitmqctl add_user xlliu24 s19911009! //(30)配置权限 rabbitmqctl set_permissions -p "/" xlliu24 ".*" ".*" ".*" //(31)查看用户权限 rabbitmqctl list_user_permissions xlliu24 //(32)设置tag rabbitmqctl set_user_tags xlliu24 administrator //(33)删除用户(安全起见,删除默认用户) rabbitmqctl delete_user guest //(34)然后用新用户登录,成功后看到界面
二.如何使用RMQ
//1.Hello World //(1)引入maven包 amqp-client commons-io //(2)P(生产者) -发消息-队列hello(中间件)- 接受消息-C(消费者) //生产者 package com.day.controller; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; //发消息 public class Producer { //队列名称 public static final String QUEUE_NAME="hello"; //发消息 public static void main(String[] args) throws Exception { //创建工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //工厂IP 连接RMQ的队列 connectionFactory.setHost("47.105.174.97"); //用户名密码 connectionFactory.setUsername("xlliu24"); connectionFactory.setPassword("s19911009!"); //创建连接 Connection connection =connectionFactory.newConnection(); //获取信道 Channel channel =connection.createChannel(); //创建队列 /** * 1.队列名称 * 2.队列里面的消息是否持久化 默认情况消息存储在内存中 持久化存储在磁盘 * 3.该队列是否只供一个消费者进行消费 是否进行消息共享 true可以多个消费者消费 false则不允许 * 4.是否自动删除 最后一个消费者断开后 改队列是否自动删除 true自动删除 false反 * 5.其他参数 * */ channel.queueDeclare(QUEUE_NAME,true,false,false,null); //发消息 String message="Hello World"; /** * 1.发送到那个交换机 * 2.路由的key值 本次是队列的名称 * 3.其他参数信息 * 4.发送的消息 * */ channel.basicPublish("",QUEUE_NAME,null,message.getBytes()); System.out.println("消息发送完毕..."); } } //消费者 package com.day.controller; import com.rabbitmq.client.*; //消费者 public class Consumer { //队列的名称 public static final String QUEUE_NAME="Hello"; //接收消息 public static void main(String[] args) throws Exception { //创建连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("47.105.174.97"); connectionFactory.setUsername("xlliu24"); connectionFactory.setPassword("s19911009!"); //创建新链接 Connection connection=connectionFactory.newConnection(); Channel channel=connection.createChannel(); //声明 接受消息 DeliverCallback deliverCallback=(consumerTag,message) ->{ System.out.println(new String(message.getBody())); }; //取消消费 CancelCallback cancelCallback=consumerTage ->{ System.out.println("消费消息被中断..."); }; //消费者消费消息 /** * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答 * 3.消费者未成功消费的回调 * 4.消费者取消消费的回调 * * */ channel.basicConsume("hello",true,deliverCallback,cancelCallback); } } ------------------------------------ //2.Work Queues 工作队列 //(1)生产者-大量发消息-队列hello-接受消息-N个消费者(工作线程) //(2)注意:一个消息只能被处理一次,不能被处理多次。所以工作线程采用的是轮训分发消息 // 抽取信道工具类 package com.day.controller; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RMQUtil{ public static Channel getChannel() throws Exception{ //创建连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("47.105.174.97"); connectionFactory.setUsername("xlliu24"); connectionFactory.setPassword("s19911009!"); //创建新链接 Connection connection=connectionFactory.newConnection(); return connection.createChannel(); } } ----------------------- package com.day.controller; import com.rabbitmq.client.Channel; //生产者,发送大量的消息 public class Producer { //队列名称 public static final String QUEUE_NAME="hello"; //发消息 public static void main(String[] args) throws Exception { //获取信道 Channel channel =RMQUtil.getChannel(); //创建队列 /** * 1.队列名称 * 2.队列里面的消息是否持久化 默认情况消息存储在内存中 持久化存储在磁盘 * 3.该队列是否只供一个消费者进行消费 是否进行消息共享 true可以多个消费者消费 false则不允许 * 4.是否自动删除 最后一个消费者断开后 改队列是否自动删除 true自动删除 false反 * 5.其他参数 * */ channel.queueDeclare(QUEUE_NAME,true,false,false,null); //发消息 String message="Hello World"; /** * 1.发送到那个交换机 * 2.路由的key值 本次是队列的名称 * 3.其他参数信息 * 4.发送的消息 * */ for(int i=0;i<1000;i++){ channel.basicPublish("",QUEUE_NAME,null,(message+" "+i).getBytes()); System.out.println("消息发送完毕..."); } } } ------------------- package com.day.controller; import com.rabbitmq.client.*; //消费者 public class Consumer { //队列的名称 public static final String QUEUE_NAME="hello"; //接收消息 public static void main(String[] args) throws Exception { //信道 Channel channel=RMQUtil.getChannel(); //声明 接受消息 DeliverCallback deliverCallback=(consumerTag,message) ->{ System.out.println("C0接收到的消息: "+new String(message.getBody())); }; //取消消费 CancelCallback cancelCallback=consumerTage ->{ System.out.println(consumerTage+" 消费消息被中断..."); }; //消费者消费消息 /** * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答 * 3.消费者未成功消费的回调 * 4.消费者取消消费的回调 * * */ System.out.println("C0等待接受消息..."); channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } } ---------------- package com.day.controller; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; //消费者 public class Consumer1 { //队列的名称 public static final String QUEUE_NAME="hello"; //接收消息 public static void main(String[] args) throws Exception { //信道 Channel channel=RMQUtil.getChannel(); //声明 接受消息 DeliverCallback deliverCallback=(consumerTag,message) ->{ System.out.println("C1接收到的消息: "+new String(message.getBody())); }; //取消消费 CancelCallback cancelCallback=consumerTage ->{ System.out.println(consumerTage+" 消费消息被中断..."); }; //消费者消费消息 /** * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答 * 3.消费者未成功消费的回调 * 4.消费者取消消费的回调 * * */ System.out.println("C1等待接受消息..."); channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } } ---------------------- package com.day.controller; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; //消费者 public class Consumer2 { //队列的名称 public static final String QUEUE_NAME="hello"; //接收消息 public static void main(String[] args) throws Exception { //信道 Channel channel=RMQUtil.getChannel(); //声明 接受消息 DeliverCallback deliverCallback=(consumerTag,message) ->{ System.out.println("C2接收到的消息: "+new String(message.getBody())); }; //取消消费 CancelCallback cancelCallback=consumerTage ->{ System.out.println(consumerTage+" 消费消息被中断..."); }; //消费者消费消息 /** * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答 * 3.消费者未成功消费的回调 * 4.消费者取消消费的回调 * * */ System.out.println("C2等待接受消息..."); channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } } -----------------控制台打印信息如下---------------------- C0接收到的消息: Hello World 0 C0接收到的消息: Hello World 3 C0接收到的消息: Hello World 6 C0接收到的消息: Hello World 9 ---------- C1接收到的消息: Hello World 1 C1接收到的消息: Hello World 4 C1接收到的消息: Hello World 7 C1接收到的消息: Hello World 10 ----------- C2接收到的消息: Hello World 2 C2接收到的消息: Hello World 5 C2接收到的消息: Hello World 8 C2接收到的消息: Hello World 11 //3.消息应答 //(1)即消费者处理完毕后-应答-生产者删除消息 //(2)自动应答对环境要求高,并不可取; //(3)手动应答:basicAck(肯定) basicNack basicReject //(4)批量应答multiple 当前8 true 5 6 7 8都确认, false只会应答8,建议使用false //(5)消息自动重新入队,当某个通道发生异常时,RMQ将了解到消息未完全处理,并将对其重新排队。让其他通道处理,保证消息的不丢失与处理。 //(6)目的:手动应答保证消息的不丢失。 // 抽取信道工具类 package com.day.controller; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RMQUtil{ public static Channel getChannel() throws Exception{ //创建连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("47.105.174.97"); connectionFactory.setUsername("xlliu24"); connectionFactory.setPassword("s19911009!"); //创建新链接 Connection connection=connectionFactory.newConnection(); return connection.createChannel(); } } -------- package com.day.controller; import com.rabbitmq.client.Channel; //生产者,发送大量的消息 public class Producer { //队列名称 public static final String QUEUE_NAME="hello"; //发消息 public static void main(String[] args) throws Exception { //获取信道 Channel channel =RMQUtil.getChannel(); //创建队列 /** * 1.队列名称 * 2.队列里面的消息是否持久化 默认情况消息存储在内存中 持久化存储在磁盘 * 3.该队列是否只供一个消费者进行消费 是否进行消息共享 true可以多个消费者消费 false则不允许 * 4.是否自动删除 最后一个消费者断开后 改队列是否自动删除 true自动删除 false反 * 5.其他参数 * */ channel.queueDeclare(QUEUE_NAME,true,false,false,null); //发消息 String message="Hello World"; /** * 1.发送到那个交换机 * 2.路由的key值 本次是队列的名称 * 3.其他参数信息 * 4.发送的消息 * */ for(int i=0;i<10;i++){ channel.basicPublish("",QUEUE_NAME,null,(message+" "+i).getBytes("UTF-8")); System.out.println((message+" "+i)+" 消息发送完毕..."); } } } -------- package com.day.controller; import com.rabbitmq.client.*; //消费者 public class Consumer { //队列的名称 public static final String QUEUE_NAME="hello"; //接收消息 public static void main(String[] args) throws Exception { //信道 Channel channel=RMQUtil.getChannel(); //声明 接受消息 DeliverCallback deliverCallback=(consumerTag,message) ->{ try { Thread.sleep(30000); } catch (Exception e) { e.printStackTrace(); } System.out.println("C0接收到的消息: "+new String(message.getBody(),"UTF-8")); // 1.标记tag 2.不批量应答 channel.basicAck(message.getEnvelope().getDeliveryTag(),false); }; //取消消费 CancelCallback cancelCallback=consumerTage ->{ System.out.println(consumerTage+" 消费消息被中断..."); }; //消费者消费消息 /** * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答 * 3.消费者未成功消费的回调 * 4.消费者取消消费的回调 * * */ System.out.println("C0等待接受消息..."); channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback); } } ------------- package com.day.controller; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; //消费者 public class Consumer1 { //队列的名称 public static final String QUEUE_NAME="hello"; //接收消息 public static void main(String[] args) throws Exception { //信道 Channel channel=RMQUtil.getChannel(); //声明 接受消息 DeliverCallback deliverCallback=(consumerTag,message) ->{ try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } System.out.println("C1接收到的消息: "+new String(message.getBody(),"UTF-8")); // 1.标记tag 2.不批量应答 channel.basicAck(message.getEnvelope().getDeliveryTag(),false); }; //取消消费 CancelCallback cancelCallback=consumerTage ->{ System.out.println(consumerTage+" 消费消息被中断..."); }; //消费者消费消息 /** * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答 * 3.消费者未成功消费的回调 * 4.消费者取消消费的回调 * * */ System.out.println("C1等待接受消息..."); channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback); } } ------------如果C0在等待过程中宕机或者发生异常,全部消息有C1处理------------------ C0等待接受消息... C0接收到的消息: Hello World 0 -------------------- C1等待接受消息... C1接收到的消息: Hello World 1 C1接收到的消息: Hello World 3 C1接收到的消息: Hello World 5 C1接收到的消息: Hello World 7 C1接收到的消息: Hello World 9 C1接收到的消息: Hello World 2 C1接收到的消息: Hello World 4 C1接收到的消息: Hello World 6 C1接收到的消息: Hello World 8 //4.队列RMQ持久化 //(1)channel.queueDeclare(QUEUE_NAME, true,false,false,null); //(2)持久化后重启RMQ后仍然存在 //5.消息持久化 //(1)channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,(message+" "+i).getBytes("UTF-8")); //(2)但是不是绝对的,如果想绝对要参考后面的发布确认章节 //6.不公平分发 //(1)轮训分发—> //(2)问题在两个消费者就会出现,C1处理快,C0处理慢,而分发任务一致,就会出现能者不多劳。 //(3)由消费者设置分发策略: channel.basicQos(1); package com.day.controller; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; //生产者,发送大量的消息 public class Producer { //队列名称 public static final String QUEUE_NAME="hello"; //发消息 public static void main(String[] args) throws Exception { //获取信道 Channel channel =RMQUtil.getChannel(); //创建队列 /** * 1.队列名称 * 2.队列里面的消息是否持久化 默认情况消息存储在内存中 持久化存储在磁盘 * 3.该队列是否只供一个消费者进行消费 是否进行消息共享 true可以多个消费者消费 false则不允许 * 4.是否自动删除 最后一个消费者断开后 改队列是否自动删除 true自动删除 false反 * 5.其他参数 * */ channel.queueDeclare(QUEUE_NAME, true,false,false,null); //发消息 String message="Hello World"; /** * 1.发送到那个交换机 * 2.路由的key值 本次是队列的名称 * 3.其他参数信息 * 4.发送的消息 * */ for(int i=0;i<10;i++){ channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,(message+" "+i).getBytes("UTF-8")); System.out.println((message+" "+i)+" 消息发送完毕..."); } } } package com.day.controller; import com.rabbitmq.client.*; //消费者 public class Consumer { //队列的名称 public static final String QUEUE_NAME="hello"; //接收消息 public static void main(String[] args) throws Exception { //信道 Channel channel=RMQUtil.getChannel(); //设置分发策略 channel.basicQos(1); //声明 接受消息 DeliverCallback deliverCallback=(consumerTag,message) ->{ try { Thread.sleep(30000); } catch (Exception e) { e.printStackTrace(); } System.out.println("C0接收到的消息: "+new String(message.getBody(),"UTF-8")); // 1.标记tag 2.不批量应答 channel.basicAck(message.getEnvelope().getDeliveryTag(),false); }; //取消消费 CancelCallback cancelCallback=consumerTage ->{ System.out.println(consumerTage+" 消费消息被中断..."); }; //消费者消费消息 /** * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答 * 3.消费者未成功消费的回调 * 4.消费者取消消费的回调 * * */ System.out.println("C0等待接受消息..."); channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback); } } package com.day.controller; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; //消费者 public class Consumer1 { //队列的名称 public static final String QUEUE_NAME="hello"; //接收消息 public static void main(String[] args) throws Exception { //信道 Channel channel=RMQUtil.getChannel(); //设置分发策略 channel.basicQos(1); //声明 接受消息 DeliverCallback deliverCallback=(consumerTag,message) ->{ try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } System.out.println("C1接收到的消息: "+new String(message.getBody(),"UTF-8")); // 1.标记tag 2.不批量应答 channel.basicAck(message.getEnvelope().getDeliveryTag(),false); }; //取消消费 CancelCallback cancelCallback=consumerTage ->{ System.out.println(consumerTage+" 消费消息被中断..."); }; //消费者消费消息 /** * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答 * 3.消费者未成功消费的回调 * 4.消费者取消消费的回调 * * */ System.out.println("C1等待接受消息..."); channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback); } } -------------------控制台信息--------------------------------- C0等待接受消息... C0接收到的消息: Hello World 0 C1接收到的消息: Hello World 1 C1接收到的消息: Hello World 2 C1接收到的消息: Hello World 3 C1接收到的消息: Hello World 4 C1接收到的消息: Hello World 5 C1接收到的消息: Hello World 6 C1接收到的消息: Hello World 7 C1接收到的消息: Hello World 8 C1接收到的消息: Hello World 9 //7.预取值 //(1)就是预先指定C0分到多少,C1分到多少 //(2)也是消费者设置 channel.basicQos(3); channel.basicQos(7); package com.day.controller; import com.rabbitmq.client.*; //消费者 public class Consumer { //队列的名称 public static final String QUEUE_NAME="hello"; //接收消息 public static void main(String[] args) throws Exception { //信道 Channel channel=RMQUtil.getChannel(); //设置分发策略/预取值 channel.basicQos(3); //声明 接受消息 DeliverCallback deliverCallback=(consumerTag,message) ->{ try { Thread.sleep(30000); } catch (Exception e) { e.printStackTrace(); } System.out.println("C0接收到的消息: "+new String(message.getBody(),"UTF-8")); // 1.标记tag 2.不批量应答 channel.basicAck(message.getEnvelope().getDeliveryTag(),false); }; //取消消费 CancelCallback cancelCallback=consumerTage ->{ System.out.println(consumerTage+" 消费消息被中断..."); }; //消费者消费消息 /** * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答 * 3.消费者未成功消费的回调 * 4.消费者取消消费的回调 * * */ System.out.println("C0等待接受消息..."); channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback); } } package com.day.controller; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; //消费者 public class Consumer1 { //队列的名称 public static final String QUEUE_NAME="hello"; //接收消息 public static void main(String[] args) throws Exception { //信道 Channel channel=RMQUtil.getChannel(); //设置分发策略/预取值 channel.basicQos(7); //声明 接受消息 DeliverCallback deliverCallback=(consumerTag,message) ->{ try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } System.out.println("C1接收到的消息: "+new String(message.getBody(),"UTF-8")); // 1.标记tag 2.不批量应答 channel.basicAck(message.getEnvelope().getDeliveryTag(),false); }; //取消消费 CancelCallback cancelCallback=consumerTage ->{ System.out.println(consumerTage+" 消费消息被中断..."); }; //消费者消费消息 /** * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答 * 3.消费者未成功消费的回调 * 4.消费者取消消费的回调 * * */ System.out.println("C1等待接受消息..."); channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback); } } ---控制台信息--- C1等待接受消息... C1接收到的消息: Hello World 1 C1接收到的消息: Hello World 3 C1接收到的消息: Hello World 5 C1接收到的消息: Hello World 6 C1接收到的消息: Hello World 7 C1接收到的消息: Hello World 8 C1接收到的消息: Hello World 9 --- C0接收到的消息: Hello World 0 C0接收到的消息: Hello World 2 C0接收到的消息: Hello World 4 //8.发布确认 //(1)是解决消息不丢失的重要环节 //(2)生产者-发消息-队列hello- //(3)1设置队列持久化->2设置消息持久化—>3发布确认(这里第3条才能确认消息真的保存在磁盘上了) //(4)开启发布确认的方法 channel.confirmSelect(); //(5)单个确认发布 批量确认发布 异步确认发布三种方法 //(6)单个确认发布: 发一条确认一条,速度慢 //(7)批量确认发布: 34集 //(8)异步确认发布: 第三种最牛批,性能最厉害 package com.day.controller; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmCallback; import com.rabbitmq.client.MessageProperties; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; //生产者,发送大量的消息 public class Producer { //队列名称 public static final String QUEUE_NAME="hello"; //发消息 public static void main(String[] args) throws Exception { //单个确认 耗时31830ms //publicMessageIndividually(); //批量确认 耗时1660ms //publicMessageBatch(); //异步确认 耗时1102ms publicMessageAsync(); } //单个确认 public static void publicMessageIndividually() throws Exception{ long startTime=System.currentTimeMillis(); //获取信道 Channel channel =RMQUtil.getChannel(); //开启发布确认 channel.confirmSelect(); //创建队列 /** * 1.队列名称 * 2.队列里面的消息是否持久化 默认情况消息存储在内存中 持久化存储在磁盘 * 3.该队列是否只供一个消费者进行消费 是否进行消息共享 true可以多个消费者消费 false则不允许 * 4.是否自动删除 最后一个消费者断开后 改队列是否自动删除 true自动删除 false反 * 5.其他参数 * */ channel.queueDeclare(QUEUE_NAME, true,false,false,null); //发消息 String message="Hello World"; /** * 1.发送到那个交换机 * 2.路由的key值 本次是队列的名称 * 3.其他参数信息 * 4.发送的消息 * */ for(int i=0;i<1000;i++){ channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,(message+" "+i).getBytes("UTF-8")); System.out.println((message+" "+i)+" 消息发送完毕..."); //进行发布确认 boolean flag=channel.waitForConfirms(); if(flag){ System.out.println("确认发送成功..."); }else{ System.err.println("不确认发送成功..."); } } long endTime=System.currentTimeMillis(); System.out.println("耗时"+(endTime-startTime)+"ms"); } //批量发布确认 public static void publicMessageBatch() throws Exception{ long startTime=System.currentTimeMillis(); //获取信道 Channel channel =RMQUtil.getChannel(); //开启发布确认 channel.confirmSelect(); //创建队列 channel.queueDeclare(QUEUE_NAME, true,false,false,null); //发消息 //批量确认消息大小 int batchSize=100; String message="Hello World"; for(int i=0;i<1000;i++){ channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,(message+" "+i).getBytes("UTF-8")); System.out.println((message+" "+i)+" 消息发送完毕..."); //判断达到100条,批量确认一次 if(i%batchSize==0){ boolean flag=channel.waitForConfirms(); if(flag){ System.out.println("确认100条发送成功..."); }else{ System.err.println("不确认100条发送成功..."); } } } long endTime=System.currentTimeMillis(); System.out.println("耗时"+(endTime-startTime)+"ms"); } //异步确认发布 //生产消息的时候一一编号,也叫给ID或者提取特征,有无问题broker会通知你 //速度最快,效率最高,实用性最大 public static void publicMessageAsync() throws Exception{ long startTime=System.currentTimeMillis(); //线程安全有序的一个哈希表 适用于高并发的情况下 ConcurrentSkipListMap<Long,Object> concurrentSkipListMap=new ConcurrentSkipListMap<>(); //获取信道 Channel channel =RMQUtil.getChannel(); //开启发布确认 channel.confirmSelect(); //消息监听器,监听消息是否发送成功了 //确认成功的回调函数 ConfirmCallback ackCallBack=(deliveryTag,multiple) ->{ System.out.println("确认发送成功的消息 "+deliveryTag); //2.删除已经确认的消息 //是否批量确认 if(multiple){ ConcurrentNavigableMap<Long,Object> concurrentNavigableMap= concurrentSkipListMap.headMap(deliveryTag); System.out.println("即将删除 "+concurrentNavigableMap); concurrentNavigableMap.clear(); }else{ System.out.println("即将删除 "+concurrentSkipListMap.get(deliveryTag)); concurrentSkipListMap.remove(deliveryTag); } }; //确认失败的回调函数 ConfirmCallback nackCallBack=(deliveryTag,multiple) ->{ System.err.println("不能确认发送成功的消息 "+deliveryTag); //打印一下未确认的消息 String temp= (String) concurrentSkipListMap.get(deliveryTag); System.out.println("不能确认发送成功的消息 "+temp); }; channel.addConfirmListener(ackCallBack,nackCallBack); //创建队列 channel.queueDeclare(QUEUE_NAME, true,false,false,null); String message="异步确认... Hello World"; for(int i=0;i<1000;i++){ //1.此处记录所有要发送的消息 concurrentSkipListMap.put(channel.getNextPublishSeqNo()-2,(message+" "+i)); channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,(message+" "+i).getBytes("UTF-8")); //System.out.println(channel.getNextPublishSeqNo()); } Thread.sleep(5000); System.err.println(concurrentSkipListMap.size()); long endTime=System.currentTimeMillis(); System.out.println("耗时"+(endTime-startTime)+"ms"); } }
三.发布订阅
//1.交换机 //(1)生产者-发消息-队列-消费者 //(2)原来的消息只能被消费一次,如果做到1个消息被多个消费者消费呢? //(3)生产者-交换机-RoutingKey-队列(仍是消息只能被消费一次)-消费者 // -交换机-RoutingKey-队列(仍是消息只能被消费一次)-消费者... //(4)RMQ把消息给队列,必须走交换机,不指定是默认的交换机。 //(5)交换机的类型:直接direct 主题topic 标题headers 扇出fanout 无名"" //(6)绑定bindings 用RoutingKey可以区分队列 //2.Fanout 广播 发布订阅模式 根源是发把消息发给交换机,然后用RoutingKey关联到多个队列,给多个队列同时发消息 //(1)P-X-n个Q-n个消费者 package com.day.controller; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; import java.util.Scanner; //生产者,发送大量的消息 public class Producer { //交换机名 public static final String EXCHANGE_NAME="logs"; //发消息 public static void main(String[] args) throws Exception { Channel channel =RMQUtil.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"fanout"); Scanner scanner=new Scanner(System.in); while(scanner.hasNext()){ String message=scanner.next(); channel.basicPublish(EXCHANGE_NAME,"", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8")); System.out.println("生产者发出消息: "+message); } } } -------------- package com.day.controller; import com.rabbitmq.client.*; //消费者 public class Consumer { //队列的名称 public static final String QUEUE_NAME="public0"; //交换机名 public static final String EXCHANGE_NAME="logs"; //接收消息 public static void main(String[] args) throws Exception { //信道 Channel channel=RMQUtil.getChannel(); //队列,生成临时队列,队列名称是随机的 channel.queueDeclare(QUEUE_NAME,true,false,false,null); //绑定交换机与队列 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); System.out.println("C0等待,把接受到的消息打印在屏幕上..."); //声明 接受消息 DeliverCallback deliverCallback=(consumerTag,message) ->{ System.out.println("C0接收到的消息: "+new String(message.getBody(),"UTF-8")); }; CancelCallback cancelCallback=consumerTage ->{ System.out.println(consumerTage+" 消费消息被中断..."); }; channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } } -------------------------------- package com.day.controller; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; //消费者 public class Consumer1 { //队列的名称 public static final String QUEUE_NAME="public1"; public static final String EXCHANGE_NAME="logs"; //接收消息 public static void main(String[] args) throws Exception { //信道 Channel channel=RMQUtil.getChannel(); //队列,生成临时队列,队列名称是随机的 channel.queueDeclare(QUEUE_NAME,true,false,false,null); //绑定交换机与队列 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,""); System.out.println("C1等待,把接受到的消息打印在屏幕上..."); //声明 接受消息 DeliverCallback deliverCallback=(consumerTag,message) ->{ System.out.println("C1接收到的消息: "+new String(message.getBody(),"UTF-8")); }; CancelCallback cancelCallback=consumerTage ->{ System.out.println(consumerTage+" 消费消息被中断..."); }; channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } } //3.Direct Exchange直接交换机 //(1)路由模式,想给谁传给谁传,根据RoutingKey不同指定轻松实现。绑定相同的RoutingKey就是fanout模式了 package com.day.controller; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; import java.util.Scanner; //生产者,发送大量的消息 public class Producer { //交换机名 public static final String EXCHANGE_NAME="direct_logs"; public static final String CONSOLE_INFO="info"; public static final String CONSOLE_WARNING="warning"; public static final String DISK_ERROR="error"; //发消息 public static void main(String[] args) throws Exception { Channel channel =RMQUtil.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); Scanner scanner=new Scanner(System.in); System.out.println("生产者准备从控制台获取信息发送..."); while(scanner.hasNext()){ String message=scanner.next(); channel.basicPublish(EXCHANGE_NAME,DISK_ERROR, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8")); System.out.println("生产者发出消息: "+message); } } } ---------- package com.day.controller; import com.rabbitmq.client.*; //消费者 public class Consumer { //队列的名称 public static final String QUEUE_NAME="console"; //交换机名 public static final String EXCHANGE_NAME="direct_logs"; //接收消息 public static void main(String[] args) throws Exception { //信道 Channel channel=RMQUtil.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.DIRECT); //队列,生成临时队列,队列名称是随机的 channel.queueDeclare(QUEUE_NAME,true,false,false,null); //绑定交换机与队列 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning"); System.out.println("C0等待,把接受到的消息打印在屏幕上..."); //声明 接受消息 DeliverCallback deliverCallback=(consumerTag,message) ->{ System.out.println("C0接收到的消息: "+new String(message.getBody(),"UTF-8")); }; CancelCallback cancelCallback=consumerTage ->{ System.out.println(consumerTage+" 消费消息被中断..."); }; channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } } ---------- package com.day.controller; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; //消费者 public class Consumer1 { //队列的名称 public static final String QUEUE_NAME="disk"; public static final String EXCHANGE_NAME="direct_logs"; //接收消息 public static void main(String[] args) throws Exception { //信道 Channel channel=RMQUtil.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); //队列,生成临时队列,队列名称是随机的 channel.queueDeclare(QUEUE_NAME,true,false,false,null); //绑定交换机与队列 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error"); System.out.println("C1等待,把接受到的消息打印在屏幕上..."); //声明 接受消息 DeliverCallback deliverCallback=(consumerTag,message) ->{ System.out.println("C1接收到的消息: "+new String(message.getBody(),"UTF-8")); }; CancelCallback cancelCallback=consumerTage ->{ System.out.println(consumerTage+" 消费消息被中断..."); }; channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } } //4.Topics 主题交换机 //(1)P-X-RoutingKey- //(2)主题交换机的RoutingKey不能随意写,必须是.隔开的单词列表;*代表一个单词,#代表0或多个单词 //(3)支持匹配模式,*.orange.* lazy.# *.*.rabbit //(4)如果绑定# 则这个对垒将接受所有数据,有点像fanout //(5)如果没有# *就是direct交换机,这不就是多个正则吗 package com.day.controller; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; import java.util.HashMap; import java.util.Map; import java.util.Scanner; //生产者,发送大量的消息 public class Producer { //交换机名 public static final String EXCHANGE_NAME="topic_logs"; //发消息 public static void main(String[] args) throws Exception { Channel channel =RMQUtil.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); Map<String,String> bindingKeyMap=new HashMap<>(); bindingKeyMap.put("quick.orange.rabbit","被队列Q1Q2接收到"); bindingKeyMap.put("lazy.orange.elephant","被队列Q1Q2接收到"); bindingKeyMap.put("quick.orange.fox","被队列Q1接收到"); bindingKeyMap.put("lazy.brown.fox","被队列Q2接收到"); bindingKeyMap.put("lazy.pink.rabbit","Q2接收一次"); bindingKeyMap.put("quick.brown.fox","被丢弃"); bindingKeyMap.put("quick.orange.male.rabbit","被丢弃"); bindingKeyMap.put("lazy.orange.male.rabbit","Q2"); for(String key:bindingKeyMap.keySet()){ String message=bindingKeyMap.get(key); //队列名称是通过key指定的哦 channel.basicPublish(EXCHANGE_NAME,key,null,message.getBytes("UTF-8")); System.out.println("生产者发出消息: "+message); } } } ------ package com.day.controller; import com.rabbitmq.client.*; //消费者 public class Consumer { //队列的名称 public static final String QUEUE_NAME="Q1"; //交换机名 public static final String EXCHANGE_NAME="topic_logs"; //接收消息 public static void main(String[] args) throws Exception { //信道 Channel channel=RMQUtil.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC); //队列,生成临时队列,队列名称是随机的 channel.queueDeclare(QUEUE_NAME,true,false,false,null); //绑定交换机与队列 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*"); System.out.println("C0等待,把接受到的消息打印在屏幕上..."); //接受消息 DeliverCallback deliverCallback=(consumerTag,message) ->{ System.out.println("C0接收到的消息: "+new String(message.getBody(),"UTF-8") +"。通过队列 "+QUEUE_NAME+"。绑定键: "+message.getEnvelope().getRoutingKey()); }; CancelCallback cancelCallback=consumerTage ->{ System.out.println(consumerTage+" 消费消息被中断..."); }; channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } } ------ package com.day.controller; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; //消费者 public class Consumer1 { //队列的名称 public static final String QUEUE_NAME="Q2"; public static final String EXCHANGE_NAME="topic_logs"; //接收消息 public static void main(String[] args) throws Exception { //信道 Channel channel=RMQUtil.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC); //队列,生成临时队列,队列名称是随机的 channel.queueDeclare(QUEUE_NAME,true,false,false,null); //绑定交换机与队列 channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.*.rabbit"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"lazy.#"); System.out.println("C1等待,把接受到的消息打印在屏幕上..."); //接受消息 DeliverCallback deliverCallback=(consumerTag,message) ->{ System.out.println("C1接收到的消息: "+new String(message.getBody(),"UTF-8") +"。通过队列 "+QUEUE_NAME+"。绑定键: "+message.getEnvelope().getRoutingKey()); }; CancelCallback cancelCallback=consumerTage ->{ System.out.println(consumerTage+" 消费消息被中断..."); }; channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback); } } //5.死信 //(1)消息拒绝 效果过期 队列达到最大长度 //(2)开启消费者0,然后关闭,然后运行生产者发消息,会发现消息在normal_queue,10S后在dead_queue的变化 //(3)无法被消费的消息,由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信 //(4)死信队列机制,消费时发生异常,将消息放到死信队列中,防止消息的丢失 //(5)死信的来源:消息TTL过期、队列达到最大长度、消息被拒绝并且requeue=false; package com.day.controller; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import java.util.HashMap; import java.util.Map; //生产者,发送大量的消息 public class Producer { //交换机名 public static final String NORMAL_EXCHANGE="normal_exchange"; //发消息 public static void main(String[] args) throws Exception { Channel channel =RMQUtil.getChannel(); //单位是ms 即设置10S AMQP.BasicProperties basicProperties=new AMQP.BasicProperties() .builder().expiration("10000").build(); for(int i=0;i<10;i++){ String message="生产者发送的消息: "+i; //TTL -TIME TO LIVE channel.basicPublish(NORMAL_EXCHANGE,"normalBindLine",basicProperties,message.getBytes("UTF-8")); System.out.println("生产者发出消息: "+message); } } } ------ package com.day.controller; import com.rabbitmq.client.*; import java.util.HashMap; import java.util.Map; //消费者 public class Consumer { //普通交换机名 public static final String NORMAL_EXCHANGE="normal_exchange"; //死信交换机 public static final String DEAD_EXCHANGE="dead_exchange"; //两个队列 public static final String NORMAL_QUEUE="normal_queue"; public static final String DEAD_QUEUE="dead_queue"; //接收消息 public static void main(String[] args) throws Exception { //信道 Channel channel=RMQUtil.getChannel(); //----------------------------------- //声明交换机 channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT); //----------------------------------- //声明队列-要使用特殊的参数才能转发到死信队列 Map<String,Object> argumentsMap=new HashMap<>(); //设置转发的交换机 //argumentsMap.put("x-message-ttl",10000);//也可以在发消息时指定 argumentsMap.put("x-dead-letter-exchange",DEAD_EXCHANGE); //设置RoutingKey argumentsMap.put("x-dead-letter-routing-key","deadBindLine"); channel.queueDeclare(NORMAL_QUEUE,true,false,false,argumentsMap); //死信队列 channel.queueDeclare(DEAD_QUEUE,true,false,false,null); //----------------------------------- //绑定交换机与队列 channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normalBindLine"); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"deadBindLine"); //----------------------------------- System.out.println("C0等待,把接受到的消息打印在屏幕上..."); //接受消息 DeliverCallback deliverCallback=(consumerTag,message) ->{ System.out.println("C0接收到的消息: "+new String(message.getBody(),"UTF-8") +"。通过队列 "+NORMAL_QUEUE+"。绑定键: "+message.getEnvelope().getRoutingKey()); }; CancelCallback cancelCallback=consumerTage ->{ System.out.println(consumerTage+" 消费消息被中断..."); }; channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback); } } ------ package com.day.controller; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; //消费者 public class Consumer1 { public static final String DEAD_QUEUE="dead_queue"; public static void main(String[] args) throws Exception { //信道 Channel channel=RMQUtil.getChannel(); System.out.println("C1等待,把接受到的消息打印在屏幕上..."); //接受消息 DeliverCallback deliverCallback=(consumerTag,message) ->{ System.out.println("C1接收到的消息: "+new String(message.getBody(),"UTF-8") +"。通过队列 "+DEAD_QUEUE+"。绑定键: "+message.getEnvelope().getRoutingKey()); }; CancelCallback cancelCallback=consumerTag ->{ System.out.println(consumerTag+" 消费消息被中断..."); }; channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback); } } //5.死信 //(1)队列达到最大长度 package com.day.controller; import com.rabbitmq.client.Channel; //生产者,发送大量的消息 public class Producer { //交换机名 public static final String NORMAL_EXCHANGE="normal_exchange"; //发消息 public static void main(String[] args) throws Exception { Channel channel =RMQUtil.getChannel(); //单位是ms 即设置10S /* AMQP.BasicProperties basicProperties=new AMQP.BasicProperties() .builder().expiration("10000").build();*/ for(int i=0;i<10;i++){ String message="生产者发送的消息: "+i; channel.basicPublish(NORMAL_EXCHANGE,"normalBindLine",null,message.getBytes("UTF-8")); System.out.println("生产者发出消息: "+message); } } } ------ package com.day.controller; import com.rabbitmq.client.*; import java.util.HashMap; import java.util.Map; //消费者 public class Consumer { //普通交换机名 public static final String NORMAL_EXCHANGE="normal_exchange"; //死信交换机 public static final String DEAD_EXCHANGE="dead_exchange"; //两个队列 public static final String NORMAL_QUEUE="normal_queue"; public static final String DEAD_QUEUE="dead_queue"; //接收消息 public static void main(String[] args) throws Exception { //信道 Channel channel=RMQUtil.getChannel(); //----------------------------------- //声明交换机 channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT); //----------------------------------- //声明队列-要使用特殊的参数才能转发到死信队列 Map<String,Object> argumentsMap=new HashMap<>(); //设置转发的交换机 //argumentsMap.put("x-message-ttl",10000);//也可以在发消息时指定 argumentsMap.put("x-dead-letter-exchange",DEAD_EXCHANGE); //设置RoutingKey argumentsMap.put("x-dead-letter-routing-key","deadBindLine"); argumentsMap.put("x-max-length",6); channel.queueDeclare(NORMAL_QUEUE,true,false,false,argumentsMap); //死信队列 channel.queueDeclare(DEAD_QUEUE,true,false,false,null); //----------------------------------- //绑定交换机与队列 channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normalBindLine"); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"deadBindLine"); //----------------------------------- System.out.println("C0等待,把接受到的消息打印在屏幕上..."); //接受消息 DeliverCallback deliverCallback=(consumerTag,message) ->{ System.out.println("C0接收到的消息: "+new String(message.getBody(),"UTF-8") +"。通过队列 "+NORMAL_QUEUE+"。绑定键: "+message.getEnvelope().getRoutingKey()); }; CancelCallback cancelCallback=consumerTage ->{ System.out.println(consumerTage+" 消费消息被中断..."); }; channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback); } } ------ package com.day.controller; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; //消费者 public class Consumer1 { public static final String DEAD_QUEUE="dead_queue"; public static void main(String[] args) throws Exception { //信道 Channel channel=RMQUtil.getChannel(); System.out.println("C1等待,把接受到的消息打印在屏幕上..."); //接受消息 DeliverCallback deliverCallback=(consumerTag,message) ->{ System.out.println("C1接收到的消息: "+new String(message.getBody(),"UTF-8") +"。通过队列 "+DEAD_QUEUE+"。绑定键: "+message.getEnvelope().getRoutingKey()); }; CancelCallback cancelCallback=consumerTag ->{ System.out.println(consumerTag+" 消费消息被中断..."); }; channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback); } } //5.死信 //(1)消息被拒绝 package com.day.controller; import com.rabbitmq.client.Channel; //生产者,发送大量的消息 public class Producer { //交换机名 public static final String NORMAL_EXCHANGE="normal_exchange"; //发消息 public static void main(String[] args) throws Exception { Channel channel =RMQUtil.getChannel(); //单位是ms 即设置10S /* AMQP.BasicProperties basicProperties=new AMQP.BasicProperties() .builder().expiration("10000").build();*/ for(int i=0;i<10;i++){ String message="生产者发送的消息: "+i; channel.basicPublish(NORMAL_EXCHANGE,"normalBindLine",null,message.getBytes("UTF-8")); System.out.println("生产者发出消息: "+message); } } } ------ package com.day.controller; import com.rabbitmq.client.*; import java.util.HashMap; import java.util.Map; //消费者 public class Consumer { //普通交换机名 public static final String NORMAL_EXCHANGE="normal_exchange"; //死信交换机 public static final String DEAD_EXCHANGE="dead_exchange"; //两个队列 public static final String NORMAL_QUEUE="normal_queue"; public static final String DEAD_QUEUE="dead_queue"; //接收消息 public static void main(String[] args) throws Exception { //信道 Channel channel=RMQUtil.getChannel(); //----------------------------------- //声明交换机 channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT); //----------------------------------- //声明队列-要使用特殊的参数才能转发到死信队列 Map<String,Object> argumentsMap=new HashMap<>(); //设置转发的交换机 //argumentsMap.put("x-message-ttl",10000);//也可以在发消息时指定 argumentsMap.put("x-dead-letter-exchange",DEAD_EXCHANGE); //设置RoutingKey argumentsMap.put("x-dead-letter-routing-key","deadBindLine"); channel.queueDeclare(NORMAL_QUEUE,true,false,false,argumentsMap); //死信队列 channel.queueDeclare(DEAD_QUEUE,true,false,false,null); //----------------------------------- //绑定交换机与队列 channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"normalBindLine"); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"deadBindLine"); //----------------------------------- System.out.println("C0等待,把接受到的消息打印在屏幕上..."); //接受消息 DeliverCallback deliverCallback=(consumerTag,message) ->{ String msg=new String(message.getBody(),"UTF-8"); if(msg.contains("5")){ //拒绝且不放回原队列 channel.basicReject(message.getEnvelope().getDeliveryTag(),false); }else{ System.out.println("C0接收到的消息: "+new String(message.getBody(),"UTF-8") +"。通过队列 "+NORMAL_QUEUE+"。绑定键: "+message.getEnvelope().getRoutingKey()); channel.basicAck(message.getEnvelope().getDeliveryTag(),false); } }; CancelCallback cancelCallback=consumerTag ->{ System.out.println(consumerTag+" 消费消息被中断..."); }; //必须开启手动应答 channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,cancelCallback); } } ------ package com.day.controller; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; //消费者 public class Consumer1 { public static final String DEAD_QUEUE="dead_queue"; public static void main(String[] args) throws Exception { //信道 Channel channel=RMQUtil.getChannel(); System.out.println("C1等待,把接受到的消息打印在屏幕上..."); //接受消息 DeliverCallback deliverCallback=(consumerTag,message) ->{ System.out.println("C1接收到的消息: "+new String(message.getBody(),"UTF-8") +"。通过队列 "+DEAD_QUEUE+"。绑定键: "+message.getEnvelope().getRoutingKey()); }; CancelCallback cancelCallback=consumerTag ->{ System.out.println(consumerTag+" 消费消息被中断..."); }; channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback); } } //56集
四.Spinrgboot整合RMQ
//1.延迟队列 //(1)延迟队列本质上就是TTL过期的死信队列 //(2)P-X(普通交换机)-Y(延迟交换机)-3个队列,实现两种不同的延迟效果10S和40S //(3)花费了我2个小时因为Consumer类中的Channel导错了包,应该为import com.rabbitmq.client.Channel; //(4)学会了lombok.extern.slf4j.Slf4j与yml文件配置log.info的使用 //(5)见识了Springboot注解配置的强大。只要功夫深,问题能解决 ------POM.xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.5.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>SpringBoot</groupId> <artifactId>springboot-maven</artifactId> <version>0.0.1-SNAPSHOT</version> <name>springboot-maven</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <distributionManagement> <repository> <id>releases</id> <name>Nexus Release Repository</name> <url>http://wdfgdzx.top:8081/nexus/content/repositories/releases/</url> </repository> <snapshotRepository> <id>snapshots</id> <name>Nexus Snapshot Repository</name> <url>http://wdfgdzx.top:8081/nexus/content/repositories/snapshots/</url> </snapshotRepository> </distributionManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> </dependency> <!--整合MyBatis--> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.1.0</version> </dependency> <!--数据库连接池--> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.12</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>6.0.6</version> </dependency> <!--redis依赖包--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb</artifactId> </dependency>--> <!-- Thymeleaf 自动配置 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-thymeleaf</artifactId> </dependency> <!-- 允许使用非严格的 HTML 语法 --> <dependency> <groupId>net.sourceforge.nekohtml</groupId> <artifactId>nekohtml</artifactId> <version>1.9.22</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-test</artifactId> </dependency> <!--SpringBoot热部署配置 --> <!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency>--> <dependency> <groupId>org.jetbrains</groupId> <artifactId>annotations</artifactId> <version>13.0</version> <scope>compile</scope> </dependency> <!--json--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.46</version> </dependency> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.9</version> </dependency> <dependency> <groupId>com.squareup.okhttp3</groupId> <artifactId>okhttp</artifactId> <version>3.9.1</version> </dependency> <dependency> <groupId>com.squareup.okio</groupId> <artifactId>okio</artifactId> <version>1.15.0</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.6</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpcore</artifactId> <version>4.4.10</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpmime</artifactId> <version>4.5.6</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency> <!-- https://mvnrepository.com/artifact/com.google.code.gson/gson --> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.6.2</version> </dependency> <dependency> <groupId>net.minidev</groupId> <artifactId>json-smart</artifactId> </dependency> <!--Rich文本开始--> <dependency> <groupId>com.gitee.qdbp.thirdparty</groupId> <artifactId>ueditor</artifactId> <version>1.4.3.3</version> </dependency> <!-- https://mvnrepository.com/artifact/org.json/json --> <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> <version>20160810</version> </dependency> <!-- https://mvnrepository.com/artifact/commons-io/commons-io --> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.4</version> </dependency> <!-- https://mvnrepository.com/artifact/commons-fileupload/commons-fileupload --> <dependency> <groupId>commons-fileupload</groupId> <artifactId>commons-fileupload</artifactId> <version>1.3.1</version> </dependency> <!-- https://mvnrepository.com/artifact/commons-codec/commons-codec --> <dependency> <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> <version>1.9</version> </dependency> <!--Rich文本结束--> <!-- 读取Excel --> <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi-ooxml</artifactId> <version>4.1.2</version> </dependency> <!--顺丰本地jar包放置与引入--> <dependency> <groupId>com.iflytek.msp.sfexpress</groupId> <artifactId>express-sdk</artifactId> <version>2.1.5</version> <scope>system</scope> <systemPath>${project.basedir}/src/main/resources/libs/sf-csim-express-sdk-V2.1.5.jar</systemPath> </dependency> <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.9.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.20</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger2 --> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> <build> <plugins> <!-- <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.1.5.RELEASE</version> </plugin>--> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <includeSystemScope>true</includeSystemScope> </configuration> <version>2.1.5.RELEASE</version> </plugin> <!-- <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <fork>true</fork> <addResources>true</addResources> </configuration> </plugin>--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> </project> ------yml server: port: 8001 tomcat: accesslog: buffered: true directory: /home/A/SpringBoot enabled: true file-date-format: .yyyy-MM-dd pattern: common prefix: access_log rename-on-rotate: false request-attributes-enabled: false rotate: true suffix: .log spring: #devtools: #restart: #enabled=true: #支持热部署 可能导致重启,然后非实时语音转写报错。 rabbitmq: host: wdfgdzx.top port: 5672 username: xlliu24 password: s19911009! redis: #配置redis host: wdfgdzx.top prot: 6379 datasource: name: mydb type: com.alibaba.druid.pool.DruidDataSource url: jdbc:mysql://wdfgdzx.top:3306/mydb?serverTimezone=GMT%2b8 username: root password: s19911009! driver-class-name: com.mysql.cj.jdbc.Driver thymeleaf: prefix: classpath:/site/ check-template-location: true #check-tempate-location: 检查模板路径是否存在 enabled: true encoding: UTF-8 content-type: text/html cache: false mode: HTML suffix: .html servlet: multipart: #配置文件上传 max-file-size: 1000MB #设置上传的单个文件最大值,单位可以是 MB、KB,默认为 1MB max-request-size: 1024MB #设置多文件上传时,单次内多个文件的总量的最大值,单位可以是 MB、KB,默认为 10 M mybatis: mapper-locations: classpath*:/mybatis/*Mapper.xml logging: level: root: info -------交换机声明、队列声明、绑定 package com.day.controller; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RMQConfig { //普通交换机 private static final String COMMON_EXCHANGE="X"; //死信交换机 private static final String DEAD_EXCHANGE="Y"; //普通队列 private static final String COMMON_QUEUE_A="QA"; private static final String COMMON_QUEUE_B="QB"; //死信队列 private static final String DEAD_QUEUE_D="QD"; //---------------------------------- //声明交换 @Bean("commonExchange") public DirectExchange commonExchange(){ return new DirectExchange(COMMON_EXCHANGE); } @Bean("deadExchange") public DirectExchange deadExchange(){ return new DirectExchange(DEAD_EXCHANGE); } //---------------------------------- //声明队列 @Bean("commonQueueA") public Queue commonQueueA(){ Map<String,Object> arguments=new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE); //设置死信Routing Key arguments.put("x-dead-letter-routing-key","YD"); //设置TTL arguments.put("x-message-ttl",10000); return QueueBuilder.durable(COMMON_QUEUE_A) .withArguments(arguments) .build(); } @Bean("commonQueueB") public Queue commonQueueB(){ Map<String,Object> arguments=new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE); //设置死信Routing Key arguments.put("x-dead-letter-routing-key","YD"); //设置TTL arguments.put("x-message-ttl",40000); return QueueBuilder.durable(COMMON_QUEUE_B) .withArguments(arguments) .build(); } @Bean("deadQueueD") public Queue deadQueueD(){ return QueueBuilder.durable(DEAD_QUEUE_D) .build(); } //---------------------------------- //绑定 @Bean public Binding commonQueueABindingCommonExchange( @Qualifier("commonQueueA") Queue commonQueueA, @Qualifier("commonExchange") DirectExchange commonExchange){ return BindingBuilder .bind(commonQueueA).to(commonExchange) .with("XA"); } @Bean public Binding commonQueueBBindingCommonExchange( @Qualifier("commonQueueB") Queue commonQueueB, @Qualifier("commonExchange") DirectExchange commonExchange){ return BindingBuilder .bind(commonQueueB).to(commonExchange) .with("XB"); } @Bean public Binding deadQueueDBindingDeadExchange( @Qualifier("deadQueueD") Queue deadQueueD, @Qualifier("deadExchange") DirectExchange deadExchange){ return BindingBuilder .bind(deadQueueD).to(deadExchange) .with("YD"); } } ------生产者 http://localhost:8001/ttl/sendMsg/Hello package com.day.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.io.Serializable; import java.util.Date; //发送延迟消息 @Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController implements Serializable{ @Autowired private RabbitTemplate rabbitTemplate; //开始发消息 @GetMapping("/sendMsg/{message}") public void sendMsg(@PathVariable String message){ //System.out.println("当前时间: "+new Date().toString()+"发送一条信息给两个TTL队列 "+message); log.info("当前时间: {},发送一条信息给两个TTL队列:{}", new Date().toString(),message); rabbitTemplate.convertAndSend("X","XA", "消息来自ttl为10s的队列: "+message); rabbitTemplate.convertAndSend("X","XB", "消息来自ttl为40s的队列: "+message); } } -----消费者 package com.day.controller; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.Serializable; import java.nio.charset.Charset; import java.util.Date; //消费者 @Slf4j @Component public class Consumer implements Serializable { @RabbitListener(queues="QD") public void receiveD(Message message, Channel channel) throws Exception{ byte[] body=message.getBody(); String msg=new String(body, Charset.forName("UTF-8")); log.info("当前时间:{},收到死信队列的消息:{}", new Date().toString(),msg); } } ------SwaggerConfig 非必须 package com.day.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import springfox.documentation.builders.ApiInfoBuilder; import springfox.documentation.service.ApiInfo; import springfox.documentation.service.Contact; import springfox.documentation.spi.DocumentationType; import springfox.documentation.spring.web.plugins.Docket; import springfox.documentation.swagger2.annotations.EnableSwagger2; @Configuration @EnableSwagger2 public class SwaggerConfig { @Bean public Docket webApiConfig(){ return new Docket(DocumentationType.SWAGGER_2) .groupName("webApi") .apiInfo(webApiInfo()) .select() .build(); } private ApiInfo webApiInfo(){ return new ApiInfoBuilder() .title("RMQ接口文档") .description("本文描述了RMQ的微服务接口定义") .version("1.0") .contact(new Contact("wdfgdzx","http://wdfgdzx.top","wdfgdzx@163.com")) .build(); } }
//2.延迟队列的优化 package com.day.controller; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RMQConfig { //普通交换机 private static final String COMMON_EXCHANGE="X"; //死信交换机 private static final String DEAD_EXCHANGE="Y"; //普通队列 private static final String COMMON_QUEUE_A="QA"; private static final String COMMON_QUEUE_B="QB"; //延迟队列优化 private static final String COMMON_QUEUE_C="QC"; //死信队列 private static final String DEAD_QUEUE_D="QD"; //---------------------------------- //声明交换 @Bean("commonExchange") public DirectExchange commonExchange(){ return new DirectExchange(COMMON_EXCHANGE); } @Bean("deadExchange") public DirectExchange deadExchange(){ return new DirectExchange(DEAD_EXCHANGE); } //---------------------------------- //声明队列 @Bean("commonQueueA") public Queue commonQueueA(){ Map<String,Object> arguments=new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE); //设置死信Routing Key arguments.put("x-dead-letter-routing-key","YD"); //设置TTL arguments.put("x-message-ttl",10000); return QueueBuilder.durable(COMMON_QUEUE_A) .withArguments(arguments).build(); } //绑定 @Bean public Binding commonQueueABindingCommonExchange( @Qualifier("commonQueueA") Queue commonQueueA, @Qualifier("commonExchange") DirectExchange commonExchange){ return BindingBuilder .bind(commonQueueA).to(commonExchange).with("XA"); } //声明队列 @Bean("commonQueueB") public Queue commonQueueB(){ Map<String,Object> arguments=new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE); //设置死信Routing Key arguments.put("x-dead-letter-routing-key","YD"); //设置TTL arguments.put("x-message-ttl",40000); return QueueBuilder.durable(COMMON_QUEUE_B) .withArguments(arguments).build(); } //绑定 @Bean public Binding commonQueueBBindingCommonExchange( @Qualifier("commonQueueB") Queue commonQueueB, @Qualifier("commonExchange") DirectExchange commonExchange){ return BindingBuilder .bind(commonQueueB).to(commonExchange).with("XB"); } //声明队列 @Bean("commonQueueC") public Queue commonQueueC(){ Map<String,Object> arguments=new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE); //设置死信Routing Key arguments.put("x-dead-letter-routing-key","YD"); return QueueBuilder.durable(COMMON_QUEUE_C) .withArguments(arguments).build(); } //绑定 @Bean public Binding commonQueueCBindingCommonExchange( @Qualifier("commonQueueC") Queue commonQueueC, @Qualifier("commonExchange") DirectExchange commonExchange){ return BindingBuilder .bind(commonQueueC).to(commonExchange).with("XC"); } //声明队列 @Bean("deadQueueD") public Queue deadQueueD(){ return QueueBuilder.durable(DEAD_QUEUE_D).build(); } //绑定 @Bean public Binding deadQueueDBindingDeadExchange( @Qualifier("deadQueueD") Queue deadQueueD, @Qualifier("deadExchange") DirectExchange deadExchange){ return BindingBuilder .bind(deadQueueD).to(deadExchange).with("YD"); } } ----------------------------------- package com.day.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.io.Serializable; import java.util.Date; //发送延迟消息 //打印日志的注解 @Slf4j @RestController @RequestMapping("/ttl") public class ProducerController implements Serializable{ @Resource private RabbitTemplate rabbitTemplate; //开始发消息 @GetMapping("/sendMsg/{message}") public void sendMsg(@PathVariable String message){ //System.out.println("当前时间: "+new Date().toString()+"发送一条信息给两个TTL队列 "+message); log.info("当前时间:{},发送一条信息给两个TTL队列:{}", new Date(),message); rabbitTemplate.convertAndSend("X","XA", "消息来自ttl为10s的队列: "+message); rabbitTemplate.convertAndSend("X","XB", "消息来自ttl为40s的队列: "+message); } //开始发消息 消息 TTL @GetMapping("/sendExpireMsg/{message}/{ttlTime}") public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){ log.info("当前时间:{},发送一条时长{}ms信息给TTL队列QC:{}", new Date(),ttlTime,message); rabbitTemplate.convertAndSend("X","XC",message,msg ->{ msg.getMessageProperties().setExpiration(ttlTime); return msg; }); } } ------------------------------------------ package com.day.controller; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.Serializable; import java.nio.charset.Charset; import java.util.Date; //消费者 @Slf4j @Component public class Consumer implements Serializable { @RabbitListener(queues="QD") public void receiveD(Message message, Channel channel) throws Exception{ byte[] body=message.getBody(); String msg=new String(body, Charset.forName("UTF-8")); log.info("当前时间:{},收到死信队列的消息:{}", new Date().toString(),msg); } }
//3.日志文件配置与使用 配置------------- <?xml version="1.0" encoding="UTF-8"?> <!-- 日志级别从低到高分为TRACE < DEBUG < INFO < WARN < ERROR < FATAL,如果设置为WARN,则低于WARN的信息都不会输出 --> <!-- scan:当此属性设置为true时,配置文档如果发生改变,将会被重新加载,默认值为true --> <!-- scanPeriod:设置监测配置文档是否有修改的时间间隔,如果没有给出时间单位,默认单位是毫秒。 当scan为true时,此属性生效。默认的时间间隔为1分钟。 --> <!-- debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。默认值为false。 --> <configuration scan="true" scanPeriod="10 seconds"> <contextName>logback-spring</contextName> <!-- name的值是变量的名称,value的值时变量定义的值。通过定义的值会被插入到logger上下文中。定义后,可以使“${}”来使用变量。 --> <property name="logging.path" value="src/main/resources/static/client" /> <!--0. 日志格式和颜色渲染 --> <!-- 彩色日志依赖的渲染类 --> <conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" /> <conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" /> <conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" /> <!-- 彩色日志格式 --> <property name="CONSOLE_LOG_PATTERN" value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/> <!--1. 输出到控制台--> <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> <!--此日志appender是为开发使用,只配置最底级别,控制台输出的日志级别是大于或等于此级别的日志信息--> <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> <level>debug</level> </filter> <encoder> <Pattern>${CONSOLE_LOG_PATTERN}</Pattern> <!-- 设置字符集 --> <charset>UTF-8</charset> </encoder> </appender> <!--2. 输出到文档--> <!-- 2.1 level为 DEBUG 日志,时间滚动输出 --> <appender name="DEBUG_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> <!-- 正在记录的日志文档的路径及文档名 --> <file>${logging.path}/web_debug.log</file> <!--日志文档输出格式--> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern> <charset>UTF-8</charset> <!-- 设置字符集 --> </encoder> <!-- 日志记录器的滚动策略,按日期,按大小记录 --> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!-- 日志归档 --> <fileNamePattern>${logging.path}/web-debug-%d{yyyy-MM-dd}.%i.log</fileNamePattern> <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> <maxFileSize>100MB</maxFileSize> </timeBasedFileNamingAndTriggeringPolicy> <!--日志文档保留天数--> <maxHistory>15</maxHistory> </rollingPolicy> <!-- 此日志文档只记录debug级别的 --> <filter class="ch.qos.logback.classic.filter.LevelFilter"> <level>debug</level> <onMatch>ACCEPT</onMatch> <onMismatch>DENY</onMismatch> </filter> </appender> <!-- 2.2 level为 INFO 日志,时间滚动输出 --> <appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> <!-- 正在记录的日志文档的路径及文档名 --> <file>${logging.path}/web_info.log</file> <!--日志文档输出格式--> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern> <charset>UTF-8</charset> </encoder> <!-- 日志记录器的滚动策略,按日期,按大小记录 --> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!-- 每天日志归档路径以及格式 --> <fileNamePattern>${logging.path}/web-info-%d{yyyy-MM-dd}.%i.log</fileNamePattern> <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> <maxFileSize>100MB</maxFileSize> </timeBasedFileNamingAndTriggeringPolicy> <!--日志文档保留天数--> <maxHistory>15</maxHistory> </rollingPolicy> <!-- 此日志文档只记录info级别的 --> <filter class="ch.qos.logback.classic.filter.LevelFilter"> <level>info</level> <onMatch>ACCEPT</onMatch> <onMismatch>DENY</onMismatch> </filter> </appender> <!-- 2.3 level为 WARN 日志,时间滚动输出 --> <appender name="WARN_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> <!-- 正在记录的日志文档的路径及文档名 --> <file>${logging.path}/web_warn.log</file> <!--日志文档输出格式--> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern> <charset>UTF-8</charset> <!-- 此处设置字符集 --> </encoder> <!-- 日志记录器的滚动策略,按日期,按大小记录 --> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>${logging.path}/web-warn-%d{yyyy-MM-dd}.%i.log</fileNamePattern> <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> <maxFileSize>100MB</maxFileSize> </timeBasedFileNamingAndTriggeringPolicy> <!--日志文档保留天数--> <maxHistory>15</maxHistory> </rollingPolicy> <!-- 此日志文档只记录warn级别的 --> <filter class="ch.qos.logback.classic.filter.LevelFilter"> <level>warn</level> <onMatch>ACCEPT</onMatch> <onMismatch>DENY</onMismatch> </filter> </appender> <!-- 2.4 level为 ERROR 日志,时间滚动输出 --> <appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender"> <!-- 正在记录的日志文档的路径及文档名 --> <file>${logging.path}/web_error.log</file> <!--日志文档输出格式--> <encoder> <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern> <charset>UTF-8</charset> <!-- 此处设置字符集 --> </encoder> <!-- 日志记录器的滚动策略,按日期,按大小记录 --> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>${logging.path}/web-error-%d{yyyy-MM-dd}.%i.log</fileNamePattern> <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> <maxFileSize>100MB</maxFileSize> </timeBasedFileNamingAndTriggeringPolicy> <!--日志文档保留天数--> <maxHistory>15</maxHistory> </rollingPolicy> <!-- 此日志文档只记录ERROR级别的 --> <filter class="ch.qos.logback.classic.filter.LevelFilter"> <level>ERROR</level> <onMatch>ACCEPT</onMatch> <onMismatch>DENY</onMismatch> </filter> </appender> <!-- <logger>用来设置某一个包或者具体的某一个类的日志打印级别、 以及指定<appender>。<logger>仅有一个name属性, 一个可选的level和一个可选的addtivity属性。 name:用来指定受此logger约束的某一个包或者具体的某一个类。 level:用来设置打印级别,大小写无关:TRACE, DEBUG, INFO, WARN, ERROR, ALL 和 OFF, 还有一个特俗值INHERITED或者同义词NULL,代表强制执行上级的级别。 如果未设置此属性,那么当前logger将会继承上级的级别。 addtivity:是否向上级logger传递打印信息。默认是true。 <logger name="org.springframework.web" level="info"/> <logger name="org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor" level="INFO"/> --> <!-- 使用mybatis的时候,sql语句是debug下才会打印,而这里我们只配置了info,所以想要查看sql语句的话,有以下两种操作: 第一种把<root level="info">改成<root level="DEBUG">这样就会打印sql,不过这样日志那边会出现很多其他消息 第二种就是单独给dao下目录配置debug模式,代码如下,这样配置sql语句会打印,其他还是正常info级别: 【logging.level.org.mybatis=debug logging.level.dao=debug】 --> <!-- root节点是必选节点,用来指定最基础的日志输出级别,只有一个level属性 level:用来设置打印级别,大小写无关:TRACE, DEBUG, INFO, WARN, ERROR, ALL 和 OFF, 不能设置为INHERITED或者同义词NULL。默认是DEBUG 可以包含零个或多个元素,标识这个appender将会添加到这个logger。 --> <!--过滤掉spring和mybatis的一些无用的DEBUG信息--> <logger name="org.springframework" level="INFO"></logger> <logger name="org.mybatis" level="INFO"></logger> <logger name="org.apache.zookeeper" level="INFO"></logger> <!-- 4. 最终的策略 --> <!-- 4.1 开发环境:打印控制台--> <springProfile name="dev"> <logger name="com.dowin.globalvillage.controller" level="debug"/><!-- 修改此处扫描包名 --> </springProfile> <root level="debug"> <appender-ref ref="CONSOLE" /> <appender-ref ref="DEBUG_FILE" /> <appender-ref ref="INFO_FILE" /> <appender-ref ref="WARN_FILE" /> <appender-ref ref="ERROR_FILE" /> </root> <!--4.2 生产环境:输出到文档--> <springProfile name="pro"> <root level="info"> <appender-ref ref="CONSOLE" /> <appender-ref ref="DEBUG_FILE" /> <appender-ref ref="INFO_FILE" /> <appender-ref ref="ERROR_FILE" /> <appender-ref ref="WARN_FILE" /> </root> </springProfile> </configuration> 使用------------ package com.day.controller; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.Serializable; import java.nio.charset.Charset; import java.util.Date; //消费者 @Slf4j @Component public class Consumer implements Serializable { @RabbitListener(queues="QD") public void receiveD(Message message, Channel channel) throws Exception{ byte[] body=message.getBody(); String msg=new String(body, Charset.forName("UTF-8")); log.info("当前时间:{},收到死信队列的消息:{}", new Date().toString(),msg); } }
//三.发布订阅
//1.延迟队列优化 //(1)每个队列只对应一个延迟,如果面对变化的需求,怎么解决呢? //(2)写个传ttlTime的控制方法,但是发现发送20秒 发送2秒,都是一个时间接受到,为什么? //(3)RMQ只会检查第一个消息是否过期,就算是第二个消息延迟很短,第二个消息也不会优先执行,怎么弥补呢? //(4)延迟队列(基于插件的)下载插件rabbitmq_delayed_message_exchange-3.8.0.ez 百度网盘找 //(5)放置到/usr/local/rabbitmq/plugins目录下 //(6)安装 rabbitmq-plugins enable rabbitmq_delayed_message_exchange //(7)rabbitmqctl stop(停止) rabbitmq-server -detached(启动) //(8)重启看到x-delayed-message就代表成功。 //(9)写代码 ------------配置 package com.day.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.CustomExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class DelayedQueueConfig { //交换机 队列 routingKey //队列 public static final String DELAYED_QUEUE_NAME="delayed.queue"; //交换机 public static final String DELAYED_EXCHANGE_NAME="delayed.exchange"; //routingKey public static final String DELAYED_ROUTING_KEY="delayed.routingKey"; //声明交换机 @Bean public CustomExchange delayedExchange(){ Map<String,Object> arguments=new HashMap<>(); arguments.put("x-delayed-type","direct"); //交换机名称、类型、持久化、自动删除、其他参数 return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message", true,false,arguments); } //队列 @Bean public Queue delayedQueue(){ return new Queue(DELAYED_QUEUE_NAME); } //绑定 @Bean public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange){ return BindingBuilder.bind(delayedQueue).to(delayedExchange) .with(DELAYED_ROUTING_KEY).noargs(); } } -------------生产者 package com.day.controller; import com.day.config.DelayedQueueConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.io.Serializable; import java.util.Date; //发送延迟消息 //打印日志的注解 @Slf4j @RestController @RequestMapping("/ttl") public class ProducerController implements Serializable{ @Resource private RabbitTemplate rabbitTemplate; //开始发消息 @GetMapping("/sendMsg/{message}") public void sendMsg(@PathVariable String message){ //System.out.println("当前时间: "+new Date().toString()+"发送一条信息给两个TTL队列 "+message); log.info("当前时间:{},发送一条信息给两个TTL队列:{}", new Date(),message); rabbitTemplate.convertAndSend("X","XA", "消息来自ttl为10s的队列: "+message); rabbitTemplate.convertAndSend("X","XB", "消息来自ttl为40s的队列: "+message); } //开始发消息 消息 TTL @GetMapping("/sendExpireMsg/{message}/{ttlTime}") public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){ log.info("当前时间:{},发送一条时长{}ms信息给TTL队列QC:{}", new Date(),ttlTime,message); rabbitTemplate.convertAndSend("X","XC",message,msg ->{ msg.getMessageProperties().setExpiration(ttlTime); return msg; }); } //开始发消息,基于插件的 消息及延迟的时间 @GetMapping("/sendDelayMsg/{message}/{delayTime}") public void sendMsg(@PathVariable String message,@PathVariable int delayTime){ log.info("当前时间:{},发送一条时长{}ms信息给延迟队列delayed.queue:{}", new Date(),delayTime,message); rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg ->{ msg.getMessageProperties().setDelay(delayTime); return msg; }); } } ------------消费者 package com.day.controller; import com.day.config.DelayedQueueConfig; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.Serializable; import java.nio.charset.Charset; import java.util.Date; //消费者 @Slf4j @Component public class Consumer implements Serializable { @RabbitListener(queues= DelayedQueueConfig.DELAYED_QUEUE_NAME) public void receiveDelayQueue(Message message, Channel channel) throws Exception{ byte[] body=message.getBody(); String msg=new String(body, Charset.forName("UTF-8")); log.info("当前时间:{},收到死信队列的消息:{}", new Date().toString(),msg); } }
//2.小结 //(1)推荐使用RMQ解决延迟队列问题
四.发布确认高级
//1.概述 //(1)RMQ重启期间,生产者消息投递失败,导致消息丢失,需要手动处理和恢复。 //(2)交换机和队列有一个不在,都会导致消息的丢失。 //(3)如果交换机收不到消息应该如何处理? //(4)生产者通过回调接口感知交换机是否接受消息成功。 -------确认 package com.day.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ConfirmConfig { //交换机 public static final String CONFIRM_EXCHANGE_NAME="confirm_exchange"; //队列 public static final String CONFIRM_QUEUE_NAME="confirm_queue"; //routingKey public static final String CONFIRM_ROUTING_KEY="key1"; //声明交换机 @Bean public DirectExchange confirmExchange(){ return new DirectExchange(CONFIRM_EXCHANGE_NAME); } //队列 @Bean public Queue confirmQueue(){ return new Queue(CONFIRM_QUEUE_NAME); } //绑定 @Bean public Binding queueBindingExchange(@Qualifier("confirmQueue")Queue confirmQueue, @Qualifier("confirmExchange")DirectExchange confirmExchange){ return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY); } } ------------回调配置 package com.day.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; @Slf4j @Component public class MyCallBack implements RabbitTemplate.ConfirmCallback { @Resource private RabbitTemplate rabbitTemplate; //注入 @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); } //交换机确认回调方法 //1.发消息 交换机接收到了 会回调 //correlationData 保存回调消息的ID及相关信息 // b true 交换机收到了消息 // s 失败的原因,成功时为null //2.发消息 交换机接收失败了,也会回调 // b为false @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String ID; if(correlationData!=null){ ID= correlationData.getId(); }else{ ID=""; } if(ack){ log.info("交换机已经收到消息,ID为{}的消息",ID); }else{ log.info("交换机还未收到消息,ID为{}的消息,原因为{}",ID,cause); } } } ------------yml文件配置 spring: #devtools: #restart: #enabled=true: #支持热部署 可能导致重启,然后非实时语音转写报错。 rabbitmq: host: wdfgdzx.top port: 5672 username: xlliu24 password: s19911009! publisher-confirm-type: correlated ------------生产者 package com.day.controller; import com.day.config.ConfirmConfig; import com.day.config.DelayedQueueConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.io.Serializable; import java.util.Date; //发送延迟消息 //打印日志的注解 @Slf4j @RestController @RequestMapping("/ttl") public class ProducerController implements Serializable{ @Resource private RabbitTemplate rabbitTemplate; //开始发消息 @GetMapping("/sendMsg/{message}") public void sendMsg(@PathVariable String message){ //System.out.println("当前时间: "+new Date().toString()+"发送一条信息给两个TTL队列 "+message); log.info("当前时间:{},发送一条信息给两个TTL队列:{}", new Date(),message); rabbitTemplate.convertAndSend("X","XA", "消息来自ttl为10s的队列: "+message); rabbitTemplate.convertAndSend("X","XB", "消息来自ttl为40s的队列: "+message); } //开始发消息 消息 TTL @GetMapping("/sendExpireMsg/{message}/{ttlTime}") public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){ log.info("当前时间:{},发送一条时长{}ms信息给TTL队列QC:{}", new Date(),ttlTime,message); rabbitTemplate.convertAndSend("X","XC",message,msg ->{ msg.getMessageProperties().setExpiration(ttlTime); return msg; }); } //开始发消息,基于插件的 消息及延迟的时间 @GetMapping("/sendDelayMsg/{message}/{delayTime}") public void sendMsg(@PathVariable String message,@PathVariable int delayTime){ log.info("当前时间:{},发送一条时长{}ms信息给延迟队列delayed.queue:{}", new Date(),delayTime,message); rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg ->{ msg.getMessageProperties().setDelay(delayTime); return msg; }); } //开始发送消息 测试确认 @GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable String message){ CorrelationData correlationData=new CorrelationData("110161"); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY+"123",message,correlationData); log.info("发送消息内容为:{}",message); } } ----------消费者 package com.day.controller; import com.day.config.ConfirmConfig; import com.day.config.DelayedQueueConfig; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.Serializable; import java.nio.charset.Charset; import java.util.Date; //消费者 @Slf4j @Component public class Consumer implements Serializable { @RabbitListener(queues= ConfirmConfig.CONFIRM_QUEUE_NAME) public void receiveConfirmMessage(Message message, Channel channel) throws Exception{ byte[] body=message.getBody(); String msg=new String(body, Charset.forName("UTF-8")); log.info("当前时间:{},接收到的队列confirm.queue消息:{}", new Date().toString(),msg); } }
//2.回退消息 //(1)如果发现交换机和信道之间不可路由,要通过设置Mandatory参数可以在不可送达时送回给生产者。 ------------生产者 package com.day.controller; import com.day.config.ConfirmConfig; import com.day.config.DelayedQueueConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.io.Serializable; import java.util.Date; //发送延迟消息 //打印日志的注解 @Slf4j @RestController @RequestMapping("/ttl") public class ProducerController implements Serializable{ @Resource private RabbitTemplate rabbitTemplate; //开始发消息 @GetMapping("/sendMsg/{message}") public void sendMsg(@PathVariable String message){ //System.out.println("当前时间: "+new Date().toString()+"发送一条信息给两个TTL队列 "+message); log.info("当前时间:{},发送一条信息给两个TTL队列:{}", new Date(),message); rabbitTemplate.convertAndSend("X","XA", "消息来自ttl为10s的队列: "+message); rabbitTemplate.convertAndSend("X","XB", "消息来自ttl为40s的队列: "+message); } //开始发消息 消息 TTL @GetMapping("/sendExpireMsg/{message}/{ttlTime}") public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){ log.info("当前时间:{},发送一条时长{}ms信息给TTL队列QC:{}", new Date(),ttlTime,message); rabbitTemplate.convertAndSend("X","XC",message,msg ->{ msg.getMessageProperties().setExpiration(ttlTime); return msg; }); } //开始发消息,基于插件的 消息及延迟的时间 @GetMapping("/sendDelayMsg/{message}/{delayTime}") public void sendMsg(@PathVariable String message,@PathVariable int delayTime){ log.info("当前时间:{},发送一条时长{}ms信息给延迟队列delayed.queue:{}", new Date(),delayTime,message); rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg ->{ msg.getMessageProperties().setDelay(delayTime); return msg; }); } //开始发送消息 测试确认 @GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable String message){ CorrelationData correlationData=new CorrelationData("110161"); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY+"123",message,correlationData); log.info("发送消息内容为:{}",message); } } ------------回调配置 package com.day.config; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; @Slf4j @Component public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback { @Resource private RabbitTemplate rabbitTemplate; //注入 @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } //交换机确认回调方法 //1.发消息 交换机接收到了 会回调 //correlationData 保存回调消息的ID及相关信息 // b true 交换机收到了消息 // s 失败的原因,成功时为null //2.发消息 交换机接收失败了,也会回调 // b为false @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String ID; if(correlationData!=null){ ID= correlationData.getId(); }else{ ID=""; } if(ack){ log.info("交换机已经收到消息,ID为{}的消息",ID); }else{ log.info("交换机还未收到消息,ID为{}的消息,原因为{}",ID,cause); } } //在消息不可送达时,将消息返回给生产者,只有失败的时候才会回退 @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("消息{},被交换机{}退回,退回原因:{},路由key:{}", new String(message.getBody()),exchange,replyText,routingKey); } } ------------yml配置 spring: #devtools: #restart: #enabled=true: #支持热部署 可能导致重启,然后非实时语音转写报错。 rabbitmq: host: wdfgdzx.top port: 5672 username: xlliu24 password: s19911009! publisher-confirm-type: correlated publisher-returns: true
//3.备份交换机 //(1)先写交换机、路由、队列的绑定关系 //(2)再写消费者,然后删除原有的确认交换机(因为他会转发,和之前的不同了) ------交换机之间关系配置 package com.day.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ConfirmConfig { //交换机 public static final String CONFIRM_EXCHANGE_NAME="confirm_exchange"; //队列 public static final String CONFIRM_QUEUE_NAME="confirm_queue"; //routingKey public static final String CONFIRM_ROUTING_KEY="key1"; //备份交换机 public static final String BACKUP_EXCHANGE_NAME="backup_exchange"; //备份队列 public static final String BACKUP_QUEUE_NAME="backup_queue"; //报警队列 public static final String WARNING_QUEUE_NAME="warning_queue"; //声明交换机 @Bean public DirectExchange confirmExchange(){ return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true) .withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build(); } //队列 @Bean public Queue confirmQueue(){ return new Queue(CONFIRM_QUEUE_NAME); } //绑定 @Bean public Binding queueBindingExchange(@Qualifier("confirmQueue")Queue confirmQueue, @Qualifier("confirmExchange")DirectExchange confirmExchange){ return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY); } //备份交换机 @Bean public FanoutExchange backupExchange(){ return new FanoutExchange(BACKUP_EXCHANGE_NAME); } //队列 @Bean public Queue backupQueue(){ return new Queue(BACKUP_QUEUE_NAME); } @Bean public Queue warningQueue(){ return new Queue(WARNING_QUEUE_NAME); } @Bean public Binding backupQueueBindingBackupExchange(@Qualifier("backupQueue")Queue backupQueue, @Qualifier("backupExchange")FanoutExchange backupExchange){ return BindingBuilder.bind(backupQueue).to(backupExchange); } @Bean public Binding warningQueueBindingBackupExchange(@Qualifier("warningQueue")Queue warningQueue, @Qualifier("backupExchange")FanoutExchange backupExchange){ return BindingBuilder.bind(warningQueue).to(backupExchange); } } -------报警消费者 package com.day.controller; import com.day.config.ConfirmConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Date; //报警消费者 @Slf4j @Component public class WarningConsumer { @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME) public void receiveWarningMsg(Message message){ String msg=new String(message.getBody()); log.error("报警,发现不可路由消息:{}",msg); log.info("当前时间:{},接收到的队列warning_queue消息:{}", new Date().toString(),msg); } }
//4.RMQ其他知识点 //(1)消息被重复消费,重复扣了用户的钱。幂等性问题 //(2)幂等性问题的解决一般使用全局ID,使用该ID判断该消息是否已消费过。 //(3)唯一ID+指纹码,或利用redis的原子性去实现 //(4)利用redis执行setnx命令,天然具有幂等性,从而实现不重复消费。
//5.优先级队列 //(1)订单催付场景 RMQ进行改造和优化,对大客户的订单进行优先级的提升。 //(2)生产者先把消息发到队列之中,然后消费者再消费。 ------交换机、队列、路由配置 package com.day.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class CommonConfig { //交换机 public static final String EXCHANGE_NAME="exchange"; //队列 public static final String QUEUE_NAME="queue"; //routingKey public static final String ROUTING_KEY="key"; //声明交换机 @Bean public DirectExchange exchange(){ return new DirectExchange(EXCHANGE_NAME); } //队列 @Bean public Queue queue(){ Map<String,Object> arguments=new HashMap<>(); //队列 arguments.put("x-max-priority",10);//官方允许是0-255 此处设置10 允许0-10 不用设置过大 浪费CUP和内存 return QueueBuilder.durable(QUEUE_NAME) .withArguments(arguments) .build(); } //绑定 @Bean public Binding queueBindingExchange(@Qualifier("queue")Queue queue, @Qualifier("exchange")DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY); } } ------生产者 package com.day.controller; import com.day.config.CommonConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import java.io.Serializable; import java.util.Date; //发送延迟消息 //打印日志的注解 @Slf4j @RestController @RequestMapping("/RMQ") public class ProducerController implements Serializable{ @Resource private RabbitTemplate rabbitTemplate; //开始发消息 @GetMapping("/sendMessage/{message}") public void sendMsg(@PathVariable String message){ for(int i=1;i<11;i++){ if(i==5){ rabbitTemplate.convertAndSend( CommonConfig.EXCHANGE_NAME, CommonConfig.ROUTING_KEY, "生产者生产消息:"+message+i, msg -> { msg.getMessageProperties().setPriority(5); return msg; }); log.info("当前时间:{},发送一条信息给队列:{}", new Date(),message+i); }else{ rabbitTemplate.convertAndSend( CommonConfig.EXCHANGE_NAME, CommonConfig.ROUTING_KEY, "生产者生产消息:"+message+i); log.info("当前时间:{},发送一条信息给队列:{}", new Date(),message+i); } } } } ------消费者 package com.day.controller; import com.day.config.CommonConfig; import com.rabbitmq.client.*; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.Serializable; import java.nio.charset.Charset; import java.util.Date; //消费者 /* @Slf4j @Component public class Consumer implements Serializable { @RabbitListener(queues= CommonConfig.QUEUE_NAME) public void receiveConfirmMessage(Message message, Channel channel) throws Exception{ byte[] body=message.getBody(); String msg=new String(body, Charset.forName("UTF-8")); log.info("当前时间:{},接收到的队列confirm.queue消息:{}", new Date().toString(),msg); } } */ //接收消息 public class Consumer { //接收消息 public static void main(String[] args) throws Exception { //创建连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setHost("47.105.174.97"); connectionFactory.setUsername("xlliu24"); connectionFactory.setPassword("s19911009!"); //创建新链接 Connection connection=connectionFactory.newConnection(); Channel channel=connection.createChannel(); //声明 接受消息 DeliverCallback deliverCallback=(consumerTag, message) ->{ System.out.println(new String(message.getBody())); }; //取消消费 CancelCallback cancelCallback= consumerTage ->{ System.out.println("消费消息被中断..."); }; //消费者消费消息 /** * 1.消费哪个队列 * 2.消费成功之后是否要自动应答 true代表自动应答 false代表手动应答 * 3.消费者未成功消费的回调 * 4.消费者取消消费的回调 * * */ channel.basicConsume(CommonConfig.QUEUE_NAME,true,deliverCallback,cancelCallback); } }
//6.惰性队列 //(1)消息保存在内存中还是在磁盘上,正常消息是保存在内存中。在惰性中,消息是保存在磁盘中的。 //(2)当有消费者宕机、大量消息积压时,才用惰性队列。
//五.集群
//1.集群原理 //(1)集群可以不断的扩充,需要有3台机器 //(2)修改三台机器的RMQ名称,分别为node1 node2 node3 //(3)vim /etc/hosts IP 节点名称对应。三个机器都需要这么配置 //(4)远程复制命令 scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie //(5)重启RMQ与erlang rabbitmq-server -detached //(6)节点二操作rabbitmqctl stop_app ; rabbitmqctl reset ;rabbitmqctl join_cluster rabbit@node1 ;rabbitmqctl start_app ;节点三操作相同 //(7)rabbitmqctl cluster_status 查看集群状态 //(8)rabbitmqctl add_user admin 123 ;rabbitmqctl set_user_tags admin administrator //(9)rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" //(10)登录后在Overview中可以看到节点 //(11)也可以解除集群节点
//2.镜像队列 //(1)如果有个节点宕机了,重启发现消息丢失了,镜像队列就是备份。 //(2)发给1节点,但备份到2号节点。通过85集的设置可以实现。就算整个集群只剩下一台机器,也可以处理。
//3.负载均衡 //(1)Haproxy实现负载均衡(比如Nginx),实现高并发高可用
//4.联邦交换机 //(1)异地机房网络延迟的问题,北京ExchangeA 深圳ExchangeB,如果北京的用户访问深圳的RMQ怎么办? //(2)在每台机器上开启federation相关插件 //(3)rabbitmq-plugins enable rabbitmq_faderation ;rabbitmaq-plugins enable rabbitmq_federation_management ; 自带的插件,能看到 Federation Status ;Federation Upstarems //(4)联邦队列 也可以同步数据,交换机也可以实现。
//5.Shovel //(1)铲子,可以将数据从一端挖到另一端 //(2)rabbitmq-plugins enable rabbitmq_shovel rabbitmq-plugins enable rabbitmq_shovel_management //(3)node1 q1 同步到node2 q2中 实现跨地区数据同步