文章目录
- RabbitMQ 安装使用
- 安装
- 下载 Erlang
- 下载 RabbitMQ 的服务
- 安装好后看是否有 RabbitMQ 的服务
- 开启管理 UI
- RabbitMQ 端口使用一览图
- 使用
- 输出最简单的 Hello World!
- 生产者定义
- 消费者消费消息
- 小拓展
RabbitMQ 安装使用
安装
下载 Erlang
RabbitMQ 是用这个语言写的,这个语言性能好,下载就好了。
下载地址
如果是 Windows 直接点击 Installer 下载傻瓜式下载。
下载的时候一般不放到默认路径,自己选一个自己特定的安装路径。
下载 RabbitMQ 的服务
下载地址
也是直接点击看下图
安装好后看是否有 RabbitMQ 的服务
如果有这样的服务那就是成功啦。
ctrl + R
输入 services.msc
查看服务列表
找到 RabbitMQ 服务,如果在运行,那么就是安装成功了。
开启管理 UI
在目录,打开 sbin 目录
并且在此目录下打开命令行,运行 rabbitmq-plugins enable rabbitmq_management
如果没有什么报错的话就说明管理 UI 的插件已经成功配置。
这时候你只需要打开 localhost:15672/
即可进入管理页面,如下:
一开始进入,如果需要密码的话,那么用户明和密码都是 guest
如果你在服务端需要远程管理,这个账号是登录不了的,需要你自己再去创建一个账号。
RabbitMQ 端口使用一览图
比如 15672 是管理 UI 的端口
RabbitMQ 服务就是运行在端口 5672
使用
这里使用的是 Java 版本
消息队列中的有几个重要的 专有名词,一个是生产者(producer)=》 生产消息,一个是消费者(consumer)=》 消费消息,一个是 broker 对消息进行转发的中间人。还有一个就是 router 路由,如何进行转发。
输出最简单的 Hello World!
输出 Hello World 就是这样的一个简单的模型。
有一个生产者创造一条消息转发到 Broker (这个是由 RabbitMQ 自动完成),一个消费者消费这条消息。
生产者定义
代码中有详细的注释。
package com.xwhking.testcode.TestRabbitMQ;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 创建一个RabbitMQ 的连接工厂
factory.setHost("localhost"); // 设置RabbitMQ 的主机地址
try (Connection connection = factory.newConnection(); // 创建一个连接
Channel channel = connection.createChannel()) { // 创一个管道,用于通信,传输消息。
channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 生命一个队列, 第一个参数是队列的名字,第二个参数是是否持久化,第三个参数是是否独占队列,第四个参数是是否自动删除,第五个参数是队列的属性。
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); // 发布消息,第一个参数是交换机名,第二个参数是队列名,第三个参数是消息的属性,第四个参数是消息的内容。传输过程都是以二进制进行传输。
System.out.println(" [x] Sent '" + message + "'");
}
}
}
运行以后就可以在管理界面看到了。
消费者消费消息
package com.xwhking.testcode.TestRabbitMQ;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
public class Recv {
private final static String QUEUE_NAME = "hello"; // 队列名字
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 创建工厂
factory.setHost("localhost"); // 设置主机
Connection connection = factory.newConnection(); // 获取一个连接
Channel channel = connection.createChannel(); // 获取通道
channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 声明一个队列,参数:队列名,是否持久化,是否独占,是否自动删除,参数
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};// 消费者收到消息的回调函数,参数:消费者标签,消息,回调函数内,就是处理消息的代码
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); // 消费者,参数:队列名,是否自动应答,回调函数,消费者标签
}
}
消费后管理页面
拥有的一条消息就被消费啦。
小拓展
消费者不是启动以后,就是一个阻塞线程吗,如果没有消息的来到,那么线程会一直阻塞,那么我是否能够改一下生产者,能够随时输入消息,让消费者收到呢。
嗯嗯嗯!
说干就干
修改生产者
package com.xwhking.testcode.TestRabbitMQ;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory(); // 创建一个RabbitMQ 的连接工厂
factory.setHost("localhost"); // 设置RabbitMQ 的主机地址
try (Connection connection = factory.newConnection(); // 创建一个连接
Channel channel = connection.createChannel()) { // 创一个管道,用于通信,传输消息。
channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 生命一个队列, 第一个参数是队列的名字,第二个参数是是否持久化,第三个参数是是否独占队列,第四个参数是是否自动删除,第五个参数是队列的属性。
String message = "Hello World!";
Scanner scanner = new Scanner(System.in);
while(true){
message = scanner.nextLine();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); // 发布消息,第一个参数是交换机名,第二个参数是队列名,第三个参数是消息的属性,第四个参数是消息的内容。传输过程都是以二进制进行传输。
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}
大家好,我是xwhking,一名技术爱好者,目前正在全力学习 Java,前端也会一点,如果你有任何疑问请你评论,或者可以加我QQ(2837468248)说明来意!希望能够与你共同进步