RabbitMQ-默认读、写方式介绍
RabbitMQ-发布/订阅模式
目录
1、概述
2、直连交换机
3、多重绑定
4、具体代码实现
4.1 生产者部分
4.2 消费者部分
5、运行代码
6、总结
1、概述
直连交换机,可以实现类似路由的功能,消息从交换机发送到哪个队列,直连交换机是支持配置的,他可以根据不同的routing key将消息转发到不同的队列当中。
在上一篇《RabbitMQ-发布/订阅模式》中,介绍过绑定过程,类似:
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"logs", // exchange
false,
nil)
binding,就是建立起了交换机与队列之间的关系,什么样子的message路由到哪个队列,就是由绑定决定的,在rabbitmq的官方文档中,为了避免和Channel.Publish函数的key参数混淆,在bind函数中的routing key称之为binding key,比如:
err = ch.QueueBind(
q.Name, // queue name
"black", // binding key
"logs", // exchange
false,
nil)
在上面的代码中,routing key参数,在扇形交换机是无效的,这点大家要注意。
2、直连交换机
扇形交换机实现了无脑将信息广播到所有队列当中,如果我们想对消息根据一定的规则进行过滤,不同的消息入不同的队列,扇形交换机就无法实现这个功能了,这个时候就需要使用直连交换了。
上图,声明了直连交换机, 并将两个队列绑定到该交换机上,第一个队列的binding key为【orange】,第二个队列设计了两个绑定,第一个binding key为【black】,另外一个为【green】,在这种设计下,routing key为【orange】的消息将会被路由到Q1队列,routing key为【black】【green】的消息将会被路由到Q2队列,其他类型的消息就会被丢弃。
3、多重绑定
在这种模式下,其实现的功能类似扇形交换机,交换机可以将同一个消息路由到多个队列当中。
在上图的设计方式中,routing key为【black】的消息会同时路由到Q1和Q2两个队列中。
4、具体代码实现
4.1 生产者部分
第一步,和扇形交换机一样,声明交换机:
err = ch.ExchangeDeclare(
"logs_direct", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
第二步,发送消息:
body := "Hello World by dircet exchange"
err = ch.Publish(
"logs", // exchange
"info", // routing key
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
4.2 消费者部分
声明队列:
q, err := ch.QueueDeclare(
"logs_direct", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
绑定:
err = ch.QueueBind(
q.Name, // queue name
"info", // routing key(binding key)
"logs", // exchange
false,
nil,
)
5、运行代码
生产者部分全部代码:
package main
import (
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
fmt.Println("Failed to connect to RabbitMQ")
return
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
fmt.Println("Failed to open a channel")
return
}
err = ch.ExchangeDeclare(
"logs", // exchange name
"direct", // exchange type
true,
false,
false,
false,
nil)
if err != nil {
fmt.Println("Failed to declare an exchange")
return
}
body := "Hello World by dircet exchange"
err = ch.Publish(
"logs", // exchange
"info", // routing key
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
if err != nil {
fmt.Println("Failed to publish a message")
return
}
}
消费者部分全部代码:
package main
import (
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
fmt.Println("Failed to connect to RabbitMQ")
return
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
fmt.Println("Failed to open a channel")
return
}
err = ch.ExchangeDeclare("logs", "direct", true, false, false, false, nil)
if err != nil {
fmt.Println("Failed to declare an exchange")
return
}
q, err := ch.QueueDeclare(
"logs_direct", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
err = ch.QueueBind(
q.Name, // queue name
"info", // routing key(binding key)
"logs", // exchange
false,
nil,
)
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
var forever chan struct{}
go func() {
for d := range msgs {
fmt.Printf(" [x] %s\n", d.Body)
}
}()
fmt.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
}
启动消费者,程序启动后,从RabbitMQ控制台就会看到一个队列:
之后运行生产者部分代码,生产者发送消息后,消费者侧就会接收到生产者发来的消息:
6、总结
以上就是rabbitmq直连交换机的使用方式,示例代码只是做了简单的演示,对于多重绑定,各种路由规则可以自行尝试,直连交换机模式,为开发者提供了灵活的路由规则,推荐使用。