一、创建一个springboot项目
1、引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
二、配置RabbitMQ连接
1、在
application.properties
或application.yml
中配置RabbitMQ服务器的连接参数:
spring.rabbitmq.host=59.110.28.73
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
三、创建一个消费者
/*
* Copyright (c) 2020, 2024, All rights reserved.
*
*/
package com.by.consumer;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* <p>Project: spring-boot-rabbitMQ - DirectConsumer</p>
* <p>Powered by scl On 2024-04-07 16:57:20</p>
* <p>描述:<p>
*
* @author 孙臣龙 [1846080280@qq.com]
* @version 1.0
* @since 17
*/
@Configuration
public class DirectConsumer {
//注册队列
@Bean
public Queue queue() {
return QueueBuilder.durable("Direct_Q01").build();
}
//注册交换机
@Bean
public DirectExchange exchange() {
return ExchangeBuilder.directExchange("Direct_E01").build();
}
//绑定交换机和队列
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("RK01");
}
//启动一个消费者
@RabbitListener(queues = "Direct_Q01")
public void receiveMessage(String msg) {
System.out.println("消费者1:" + msg);
}
//启动一个消费者
@RabbitListener(queues = "Direct_Q01")
public void receiveMessage1(String msg) {
System.out.println("消费者2:" + msg);
}
}
四、创建一个生产者
/*
* Copyright (c) 2020, 2024, All rights reserved.
*
*/
package com.by.provider;
import com.by.consumer.OrderKO;
import org.springframework.amqp.rabbit.connection.RabbitAccessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* <p>Project: spring-boot-rabbitMQ - DirectProvider</p>
* <p>Powered by scl On 2024-04-07 17:06:41</p>
* <p>描述:<p>
*
* @author 孙臣龙 [1846080280@qq.com]
* @version 1.0
* @since 17
*/
@Service
public class DirectProvider {
@Autowired
private RabbitTemplate rabbitTemplate ;
//启动一个生产者
public void send(Object message){
rabbitTemplate.convertAndSend("Direct_E01","RK01",message);
}
}
五、测试代码
package com.by;
import com.by.consumer.OrderKO;
import com.by.provider.DirectProvider;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.IOException;
@SpringBootTest
class AppTest {
@Autowired
private DirectProvider directProvider;
@Test
void test() throws InterruptedException, IOException {
for (int i = 1; i <= 5; i++) {
directProvider.send("接收消息"+i);
//System.out.println("发送消息"+i);
Thread.sleep(1000);
}
//System.in.read();
}
}
六、测试结果
七、发送一个对象(String ---> 对象)
1、创建消费者
/*
* Copyright (c) 2020, 2024, All rights reserved.
*
*/
package com.by.consumer;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* <p>Project: spring-boot-rabbitMQ - DirectConsumer</p>
* <p>Powered by scl On 2024-04-07 16:57:20</p>
* <p>描述:<p>
*
* @author 孙臣龙 [1846080280@qq.com]
* @version 1.0
* @since 17
*/
@Configuration
public class DirectConsumer {
//注册队列
@Bean
public Queue queue() {
return QueueBuilder.durable("Direct_Q01").build();
}
//注册交换机
@Bean
public DirectExchange exchange() {
return ExchangeBuilder.directExchange("Direct_E01").build();
}
//绑定交换机和队列
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("RK01");
}
//启动一个消费者
@RabbitListener(queues = "Direct_Q01")
public void receiveMessage(OrderKO msg) {
System.out.println("消费者1:" + msg);
}
//启动一个消费者
@RabbitListener(queues = "Direct_Q01")
public void receiveMessage1(OrderKO msg) {
System.out.println("消费者2:" + msg);
}
}
2、创建一个对象
/*
* Copyright (c) 2020, 2024, All rights reserved.
*
*/
package com.by.consumer;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* <p>Project: spring-boot-rabbitMQ - OrderKO</p>
* <p>Powered by scl On 2024-04-07 19:13:57</p>
* <p>描述:<p>
*
* @author 孙臣龙 [1846080280@qq.com]
* @version 1.0
* @since 17
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class OrderKO implements Serializable {
private int id;
private String name;
}
3、为防止乱码,写一个rabbitmq的配置类
/*
* Copyright (c) 2020, 2024, All rights reserved.
*
*/
package com.by.provider;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* <p>Project: spring-boot-rabbitMQ - RabbitConfig</p>
* <p>Powered by scl On 2024-04-07 19:31:27</p>
* <p>描述:<p>
*
* @author 孙臣龙 [1846080280@qq.com]
* @version 1.0
* @since 17
*/
@Configuration
public class RabbitConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
4、测试
package com.by;
import com.by.consumer.OrderKO;
import com.by.provider.DirectProvider;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.io.IOException;
@SpringBootTest
class AppTest {
@Autowired
private DirectProvider directProvider;
@Test
void test() throws InterruptedException, IOException {
for (int i = 1; i <= 5; i++) {
OrderKO orderKO = OrderKO.builder().id(i).name("孙臣龙" + i).build();
directProvider.send(orderKO);
//System.out.println("发送消息"+i);
Thread.sleep(1000);
}
//System.in.read();
}
}
5、结果对比