rabbitmq 的基本概念 vhost、broker、producer、 consumer、 exchange、 queue、 routing key
rabbitmq 常用的队列类型,工作队列(简单队列),pub/sub, routing key, topic 模式
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.16.0</version> </dependency>
public class RabbitmqClientDemo {
private static ConnectionFactory factory = new ConnectionFactory();
private static String EXCHANGE_NAME = "exchange.fanout";
private static String FANOUT_QUEUE = "queue.fanout";
private static String DIRECT_EXCHANGE = "exchange_direct";
private static String QUEUE_DIRCT = "queue.direct.02";
private static String QUEUE_TOPIC_ONE = "queue.topic.01";
private static String QUEUE_TOPIC_TWO = "queue.topic.02";
private static String QUEUE_TOPIC_THREE = "queue.topic.03";
private static String ROUNTING_KEY_ONE = "routing.key.01";
private static String ROUNTING_KEY_TWO = "routing.key.02";
private static String ROUNTING_KEY_THREE = "routing.key.03";
private static String DEAD_MESSAGE_EXCHANGE = "EXCHANGE_DEAD";
private static String DEAD_QUEUE = "queue.dead";
static {
factory.setHost("192.168.233.128");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
}
public static Connection getConnection() throws IOException, TimeoutException {
return factory.newConnection();
}
public static Channel createChannel() throws IOException, TimeoutException {
Connection connection = getConnection();
return connection.createChannel();
}
public static void main(String[] args) {
//new WorkQueueProducer().start();
//new WorkerConsumer().start();
/* new PublishConsumer().start();
new PublishProducer().start();*/
//new TopicProducer().start();
//new TopicConsumer().start();
new DeadMessageProducer().start();
new DeadMessageConsumer().start();
}
static class WorkQueueProducer extends Thread {
@Override
public void run() {
try {
Connection connection = getConnection();
Channel channel = connection.createChannel();
channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
channel.basicPublish("", "hello", null, "hello".getBytes());
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//hannel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
static class WorkerConsumer extends Thread {
@Override
public void run() {
try {
Connection connection = getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("hello", false, false, false, null);
channel.basicQos(1);
channel.basicConsume("hello", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(new String(delivery.getBody()));
}
}, consumerTag -> {
});
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
static class PublishProducer extends Thread {
@Override
public void run() {
try {
Channel channel = createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.queueDeclare(FANOUT_QUEUE, true, false, false, null);
channel.queueBind(FANOUT_QUEUE, EXCHANGE_NAME, "");
for (int i = 1; i <= 40; i++) {
String message = String.format("current orderId is %d, money is %d", UUID.randomUUID(), new Random().nextDouble());
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
static class PublishConsumer extends Thread {
@Override
public void run() {
try {
Channel channel = createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.queueDeclare(FANOUT_QUEUE, true, false, false, null);
channel.queueBind(FANOUT_QUEUE, EXCHANGE_NAME, "");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
System.out.println(new String(delivery.getBody(), StandardCharsets.UTF_8));
};
while (true) {
channel.basicConsume(FANOUT_QUEUE, true, deliverCallback, consumerTag -> {
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
static class TopicProducer extends Thread {
@Override
public void run() {
try {
Channel channel = createChannel();
channel.exchangeDeclare(DIRECT_EXCHANGE, "topic");
channel.queueDeclare(QUEUE_TOPIC_ONE, true, false, false, null);
channel.queueDeclare(QUEUE_TOPIC_TWO, true, false, false, null);
channel.queueDeclare(QUEUE_TOPIC_THREE, true, false, false, null);
channel.queueBind(QUEUE_TOPIC_ONE, DIRECT_EXCHANGE, ROUNTING_KEY_ONE);
channel.queueBind(QUEUE_TOPIC_TWO, DIRECT_EXCHANGE, ROUNTING_KEY_TWO);
channel.queueBind(QUEUE_TOPIC_TWO, DIRECT_EXCHANGE, ROUNTING_KEY_TWO);
channel.queueBind(QUEUE_TOPIC_THREE, DIRECT_EXCHANGE, ROUNTING_KEY_THREE);
channel.queueBind(QUEUE_TOPIC_THREE, DIRECT_EXCHANGE, ROUNTING_KEY_ONE);
channel.queueBind(QUEUE_TOPIC_THREE, DIRECT_EXCHANGE, ROUNTING_KEY_TWO);
for (int i = 1; i <= 10; i++) {
String message = String.format("current orderId is %s, money is %s", UUID.randomUUID().toString(), new Random().nextDouble());
if (i % 3 == 0) {
System.out.println("send to topic1");
channel.basicPublish(DIRECT_EXCHANGE, ROUNTING_KEY_ONE, null, message.getBytes(StandardCharsets.UTF_8));
} else if (i % 3 == 1) {
System.out.println("send to topic2");
channel.basicPublish(DIRECT_EXCHANGE, ROUNTING_KEY_TWO, null, message.getBytes(StandardCharsets.UTF_8));
} else {
System.out.println("send to topic3");
channel.basicPublish(DIRECT_EXCHANGE, ROUNTING_KEY_THREE, null, message.getBytes(StandardCharsets.UTF_8));
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
static class TopicConsumer extends Thread {
@Override
public void run() {
try {
Channel channel = createChannel();
channel.exchangeDeclare(DIRECT_EXCHANGE, "topic");
channel.queueDeclare(QUEUE_TOPIC_THREE, true, false, false, null);
channel.queueBind(QUEUE_TOPIC_THREE,EXCHANGE_NAME,"routing.key.*")
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
System.out.println(delivery.getEnvelope().getRoutingKey());
System.out.println(new String(delivery.getBody(), StandardCharsets.UTF_8));
};
while (true) {
channel.basicConsume(QUEUE_TOPIC_THREE, true, deliverCallback, consumerTag -> {
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
static class DeadMessageProducer extends Thread {
@Override
public void run() {
try {
Channel channel = createChannel();
channel.exchangeDeclare(DIRECT_EXCHANGE, "direct");
channel.queueDeclare(DEAD_QUEUE, true, false, false, null);
channel.queueBind(DEAD_QUEUE, DEAD_MESSAGE_EXCHANGE, "routing.direct02");
for (int i = 1; i <= 40; i++) {
String message = String.format("current orderId is %s, money is %s", UUID.randomUUID().toString(), new Random().nextDouble());
AMQP.BasicProperties prop = new AMQP.BasicProperties().builder().expiration("30000").build();
channel.basicPublish(DIRECT_EXCHANGE, "routing.direct02", prop, message.getBytes(StandardCharsets.UTF_8));
System.out.println("send to topic1");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
static class DeadMessageConsumer extends Thread {
@Override
public void run() {
try {
Channel channel = createChannel();
/*channel.exchangeDeclare(DEAD_MESSAGE_EXCHANGE, "direct");
Map<String, Object> deadLetterParams = new HashMap<>(2);
deadLetterParams.put("x-dead-letter-exchange", DEAD_MESSAGE_EXCHANGE);
deadLetterParams.put("x-dead-letter-routing-key", "routing.dead02");
deadLetterParams.put("x-max-length", 2);*/
/*channel.queueDeclare(QUEUE_DIRCT, true, false, false, deadLetterParams);
channel.queueBind(DEAD_QUEUE, DEAD_MESSAGE_EXCHANGE, "routing.dead02");*/
channel.exchangeDeclare(DIRECT_EXCHANGE, "direct");
channel.queueBind(QUEUE_DIRCT, DIRECT_EXCHANGE, "routing.direct02");
DeliverCallback callback = (consumerTag, delivery) -> {
System.out.println(delivery.getEnvelope().getRoutingKey());
System.out.println(new String(delivery.getBody(), StandardCharsets.UTF_8));
};
/* DeliverCallback callback = (consumerTag, delivery) -> {
String receivedMessage = new String(delivery.getBody());
System.out.println("C1接收到消息:" + receivedMessage + "并且拒绝签收了");
// 禁止重新入队
channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
};*/
while (true) {
//channel.basicConsume(QUEUE_DIRCT, true, deliverCallback, consumerTag -> {});
channel.basicConsume(QUEUE_DIRCT, true, callback, (consumerTag) -> {
System.out.println(consumerTag + "消费者取消消费消息");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
//channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
整合springboot
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>3.1.5</version> </dependency>
rabbitmq 的核心配置(相比于其他的mq,rabbit 有图形用户界面,可以傻瓜操作)
https://blog.csdn.net/leesinbad/article/details/128670794