RabbitMQ简单使用

 

RabbitMq是一个消息中间件:它接收消息、转发消息。你可以把它理解为一个邮局:当你向邮箱里寄出一封信后,邮递员们就能最终将信送到收信人手中。 

RabbitMq、消息相关术语如下:

  生产者:生产者只发送消息,发送消息的程序即为生产者:

  

  消息队列:消息队列就相当于RabbitMq中的邮箱,消息存储在消息队列中。队列本质上是一个大的消息缓存,它能存多少消息,取决于主机的内存和磁盘限制。多个生产者可以往同一个消息队列中发送消息;多个消费者可以从同一个队列中获取数据。

  

  消费者:消费者是一个等待接收消息的程序:

  

  注意:生产者、消费者和RabbitMq可以在不同的机器上;在很多的应用中,一个生产者同时也可能是消费者。

在下面图形中,“P”是消息的生产者,“C”是消息的消费者,中间的红框是消息队列,保存了从生产者那里接收到的准备转发到消费方的消息。

  

 

Hello World

发送消息

生产者连接RabbitMq,发送一条简单的消息”Hello World!“后就退出。

在Send.java类中,需要引入以下依赖包:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

给队列起个名字:

public class Send {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] argv) throws Exception {
       ...
    }
}

创建连接到服务器的连接Collection:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
     Channel channel = connection.createChannel()) {
    ...
}

这个连接是套接字连接,为我们处理协议版本协商和身份验证等。这里我们连接一个本地的RabbitMq因此是localhost,如果想连接到一个远程的RabbitMq,只需要把localhst改成那台机器的IP地址即可。

创建完连接之后,要继续创建一个信道:Channel。使用try-with-resource表达式,因为Connection和Channel都实现了JAVA接口Closeable,属于资源,需要关闭。使用try-with-resource就不需要显示地在我们的代码中进行关闭了。(关于信道,请参考文章最顶部的RabbitMq原理图,它是TCP里面的虚拟链接,例如:电缆相当于一个TCP,信道就是里面的一个独立光纤,一条TCP上面创建多条信道是没有问题的;TCP一旦打开就会创建AMQP信道;无论是发布消息、接收消息、订阅队列,这些动作都是通过信道完成的)。

为了发送消息,我们还必须定义一个消息发送到的消息队列:

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

队列声明函数queueDeclare有多个参数,其分别是:

1. queue: 队列的名称 

2. durable: 是否持久化 

当durable = false时,队列非持久化。因为队列是存放在内存中的,所以当RabbitMQ重启或者服务器重启时该队列就会丢失 ;
当durable = true时,队列持久化。当RabbitMQ重启后队列不会丢失。RabbitMQ退出时它会将队列信息保存到 Erlang自带的Mnesia数据库 中,当RabbitMQ重启之后会读取该数据库
3. exclusive: 是否排外的 ;

当exclusive = true则设置队列为排他的。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接(Connection)可见,是该Connection私有的,类似于加锁,并在连接断开connection.close()时自动删除 ;
当exclusive = false则设置队列为非排他的,此时不同连接(Connection)的管道Channel可以使用该队列 ;
注意2点:

排他队列是 基于连接(Connection) 可见的,同个连接(Connection)的不同管道 (Channel) 是可以同时访问同一连接创建的排他队列 。其他连接是访问不了的 ,强制访问将报错:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'hello-testExclusice' in vhost '/'.;

以下声明是没问题的:

	Channel channel = connection.createChannel();
    Channel channel2 = connection.createChannel();
    channel.queueDeclare(QUEUE_NAME, false, true, false, null);
    channel2.queueDeclare(QUEUE_NAME, false, true, false, null);

	=>如果是不同的 connection 创建的 channel 和 channel2,那么以上的
	=>channel2.queueDeclare()是会报错的!!!!!!

"首次" 是指如果某个连接(Connection)已经声明了排他队列,其他连接是不允许建立同名的排他队列的。这个与普通队列不同:即使该队列是持久化的(durable = true),一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
4. autoDelete: 是否自动删除 ;如果autoDelete = true,当所有消费者都与这个队列断开连接时,这个队列会自动删除。注意: 不是说该队列没有消费者连接时该队列就会自动删除,因为当生产者声明了该队列且没有消费者连接消费时,该队列是不会自动删除的。

5. arguments: 设置队列的其他一些参数,如 x-rnessage-ttl 、x-expires 、x-rnax-length 、x-rnax-length-bytes、 x-dead-letter-exchange、 x-deadletter-routing-key 、 x-rnax-priority 等。


basicPublish() 方法是基础的发布消息方法,它有四个参数

  1. String exchange : 交换机名, 当不使用交换机时,传入“”空串。
  2. String routingKey :(路由地址) 发布消息的队列, 无论channel绑定那个队列,最终发布消息的队列都由该字串指定。
  3. AMQP.BasicProperties props :消息的配置属性,例如 MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息持久化。
  4. byte[] body :消息数据本体, 必须是byte数组

定义一个消息队列时,只有该队列不存在的时候才能被创建。

消息是二进制数组,因此你可以根据需要指定编码。

完整的Send.java如下: 

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

接收消息

消费者监听RabbitMq中的消息,因此与生产者发送一条消息就退出不同,消费者要保持运行状态来接收消息并打印出来。

与生产者相同,我们需要创建Connetcion和Channel、定义队列(需要监听并接收消息的队列):

public class Recv {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

  }
}

注意我们也在这里声明队列,因为我们可能在生产者之前启动消费者。我们想要确保在我们尝试消费消息的时候队列就已经存在了。

这里我们为什么不使用try-with-resource表达式自动关闭channel和connection?这样我们就可以使我们的程序一直保持运行状态,如果把channel、connection这些关了,程序也就停止了。这就尴尬了,因为我们需要保持消费者一直处于异步监听消息过来的状态。

RabbitMq会将队列中的消息异步地推送过来,我们需要提供一个回调函数来缓存消息直到我们需要用到这些消息:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

basicConsume方法会启动一个消费者,并返回服务端生成的消费者标识,它的几个参数是
1. queue:队列名
2. autoAck:true 接收到传递过来的消息后自动acknowledged(应答服务器),false 接收到消息后不自动应答服务器
3. deliverCallback: 当一个消息发送过来后的回调接口
4. cancelCallback:当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用
方法的返回值是服务端生成的消费者标识

完整的接收端代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Recv {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

接下来创建一个工作队列,用于在多个工作者之间分配耗时的任务。

工作队列(即任务队列)的主要思想是避免立即执行那些需要等他们执行完成的资源密集型任务。相反,我们将任务安排在稍后完成。我们将任务封装为消息并将其发送到队列,后台运行的工作进程将取出任务并执行完成。如果你启动了多个工作者,这些任务将在多个工作者之间分享。

这个概念也即我们说的异步,在项目中,有时候一个简单的Web请求,后台要做一系列的操作。这时候,如果后台执行完成之后再给前台返回消息将会导致浏览器页面等待从而出现假死状态。因此,通常的做法是,在这个Http请求到后台,后台获取到正确的参数等信息后立即给前台返回一个成功标志,然后后台异步地进行后续的操作

准备

本章中,我们将发送字符串消息来模拟复杂的任务。这里因为没有一个真实的复杂任务,因此用Thread.sleep()方法来模拟复杂耗时的任务。我们用字符串中的含点(“.")的数量来表示任务的复杂程度,一个点表示一秒钟的耗时,例如:一个发送”Hello ...“字符串的任务将会耗时3秒钟。

修改前面的Send.java为NewTask.java,允许从命令行发送消息。修改后的程序会把任务发送到工作队列:

完整的NewTask.java代码为:

public class NewTask {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws IOException, TimeoutException {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");

        try(Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel()) {

            channel.queueDeclare(QUEUE_NAME,false,false,false,null);

            String message = String.join(" ", argv);

            channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

Recv.java也要做一些修改:模拟字符串消息中的每个点耗时1秒钟,它将处理传送过来的消息并执行任务,修改后的程序名字是Work.java:

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");

  System.out.println(" [x] Received '" + message + "'");
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
  }
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

doWork是模拟执行过程中耗时的伪任务:

private static void doWork(String task) throws InterruptedException {
     for (char ch: task.toCharArray()) {
         if (ch == '.') Thread.sleep(1000);
     }
}

完整的Work.java为:

public class Worker {
    private final static String TASK_QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("localhost");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println(" [x] Done");
            }
        };

        boolean autoAck = true; // acknowledgment is covered below
        channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
    }

    private static void doWork(String task) throws InterruptedException {
        for (char ch: task.toCharArray()) {
            if (ch == '.') Thread.sleep(1000);
        }
    }
}

使用工作队列的优点之一是能够轻松地进行并行化操作。假设我们在做一个后台日志收集系统,我们可以很容易地增加更多的Worker从而提高系统性能。

首先,我们同时启动两个Worker,接下来,我们先后启动5个Task,并分别通过main()参数传入五个字符串消息:

First message.
Second message..
Third message...
Fourth message....
Fifth message.....

看一下两个Worker都接收到了什么样的消息

 默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均每个消费者将得到相同数量的消息。这种消息的调度方式称之为循环调度,你可以开启更多的Worker来进行测试。

因为消费者执行一个任务会有时间耗时,假设一个消费者在执行一个任务执行一半的时候挂掉了将会怎样?消息会不会因此丢失?在我们目前的代码里,一旦RabbitMq将一条消息转发给了一个消费者后,将会立即将消息删除(注意Worker.java里的autoAck目前设置为true),因此,在我们上面例子里,如kill掉一个正在处理数据的Worker,那么该数据将会丢失。不仅如此,所有那些指派给该Worker的还未处理的消息也会丢失。

但在实际工作中,我们并不希望一个Worker挂掉之后就会丢失数据,我们希望的是:如果该Worker挂掉了,所有转发给该Worker的消息将会重新转发给其他Worker进行处理(包括处理了一半的消息)。为了确保一条消息永不丢失,RabbitMq支持消息回执。消费者在接收到一条消息,并且成功处理完成之后会给RabbitMq回发一条确认ack确认消息,RabbitMq此时才会删除该条消息。如果一个Worker正在处理一条消息时挂掉了(信道关闭、连接关闭、TCP连接丢失),它将没有机会发送ack回执,RabbitMq就认为该消息没有消费成功于是便会将该消息重新放到队列中,如果此时有其他消费者还是在线状态,RabbitMq会立即将该条消息再转发给其他在线的消费者。这种机制可以保证任何消息都不会丢失。

默认情况下,需要手动进行消息确认,在前面的例子里,我们通过autoAck=true显示地关闭了手动消息确认,因此,RabbitMq将采用自动消息确认的机制。现在,我们修改我们的程序,采用手动发送回执的方式,当我们完成对消息的处理后,再手动发送回执确认:

channel.basicQos(1); // accept only one unack-ed message at a time (see below)

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  String message = new String(delivery.getBody(), "UTF-8");

  System.out.println(" [x] Received '" + message + "'");
  try {
    doWork(message);
  } finally {
    System.out.println(" [x] Done");
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

可以看到将autoAck设置为了false。

ack发送信道必须和接收消息的信道(channel)是同一个,如果尝试通过一个不同的信道发送ack回执,将会抛出channel等级协议异常(官网说会抛出异常,但在实际测试中并没有抛异常,只是该条消息得不到回执,从而也无法删除)。另一个常见的错误是关闭了自动ack后忘了手动回执,虽然只是一个简单的错误,但是带来的后果却是严重的,它将导致已经消费掉的消费不会被删除,并且当消费该消息的消费者在退出之后,RabbitMq会将该条消息重新进行转发,内存将被慢慢耗尽。我们可以通过下面的命令来检查这种错误:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

 该命令有三列内容,第一列是在监听的队列名称,第二列是Ready状态的消息数量,第三列是Unacked的消息数量。

消息的持久化

前面讲了如何保证当消费者挂掉之后消息不被丢失,但是,如果RabbitMq服务或者部署RabbitMq的服务器挂掉了之后,消息仍然会丢失。当RabbitMq崩溃之后,它将会忘记所有的队列和消息,除非,有什么机制让RabbitMq将队列信息和消息保存下来。

要确保消息和队列不会丢失,我们必须要确保两件事情。

首先,我们要确保RabbitMq永远不丢失队列,要做到这点,我们在定义的时候就需要告诉RabbitMq它是需要持久化的,通过指定durable参数实现:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

虽然这个命令本身是正确的,但需要注意的是我们前面hello队列是一个非持久化队列,RabbitMq不允许重新定义一个已经存在的队列(用不同的参数),否则会抛出异常。

com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, 
reply-text=PRECONDITION_FAILED - parameters for queue 'hello' in vhost '/' not equivalent, class-id=50, method-id=10)

要么重启RabbitMq让该临时队列消失,要么在控制台将该队列删除,或重新创建一个新的队列:

1 boolean durable = true;
2 channel.queueDeclare("task_queue", durable, false, false, null);

生产者和消费者要做同步修改。

做完上面这一步就保证了队列(task_quee)的持久化,此时,即便RabbitMq崩溃了也不会丢失该队列,当RabbitMq重启后将自动重新加载该队列。

其次,要确保消息也被持久化。要做到这一点,在生产者发布消息的时候需要指定消息的属性为:PERSISTENT_TEXT_PLAIN。

import com.rabbitmq.client.MessageProperties;
 
channel.basicPublish("", "task_queue",
                     MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

注意,即便设置了消息的持久化属性也不能保证消息会被100%地写入到磁盘中。因为RabbitMq在接收到消息和写入到磁盘不是同步的,消息可能只是被写入到缓存中而还没来得及写入磁盘,RabbitMq崩溃了,此时也会丢失消息。但无论如何,比前面简单的消息队列已经强大了很多。

公平调度

你可能已经注意到,此时任务调度仍然不能完全按照我们希望的方式工作。举个例子,在只有两个Worker的环境中,奇数的消息比较重,偶数的消息比较轻时,一个Worker将会一直处于忙碌状态,而另一个Worker将会一直处于空闲状态,但RabbitMq并不知道这种情况,它会依然均衡地向两个Worker传递消息。发生这种情况是因为,当一个消息进入队列之后,RabbitMq只是盲目地将该第n个消息转发给第n个消费者,它并不关注每个消费者发了多少个回执。

为了解决这个问题,我们可以通过调用basicQos方法,给它传入1。这将告诉RabbitMq不要同时给一个队列转发多于1条的消息,换句话说,在一个消费者没有完成并回执前一条消息时,不要再给它转发其他消息。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

 完整的代码

NewTask.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        String message = String.join(" ", argv);

        channel.basicPublish("", TASK_QUEUE_NAME,
                MessageProperties.PERSISTENT_TEXT_PLAIN,
                message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
    }
  }

}

Worker.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

public class Worker {

  private static final String TASK_QUEUE_NAME = "task_queue";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    final Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    channel.basicQos(1);

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");

        System.out.println(" [x] Received '" + message + "'");
        try {
            doWork(message);
        } finally {
            System.out.println(" [x] Done");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    };
    channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
  }

  private static void doWork(String task) {
    for (char ch : task.toCharArray()) {
        if (ch == '.') {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }
  }
}

 工作队列模式的设想是每一条消息只会被转发给一个消费者。接下来会讲解另一种完全不一样的场景: 我们会把一个消息转发给多个消费者,这种模式称之为发布-订阅模式

为了阐述这个模式,我们将会搭建一个简单的日志系统,它包含两种程序:一种发送日志消息,另一种接收并打印日志消息。在这个日志系统里,生产者发布的消息将会以广播的形式让每一个运行的消费者都可以获取的到,我们让其中一个消费者接收消息并写入磁盘,另一个消费者接收消息并打印在电脑屏幕上。

交换器(Exchange)

回忆一下前面的内容:

  • 一个生产者用以发送消息;
  • 一个队列缓存消息;
  • 一个消费者用以消费队列中的消息。

RabbitMq消息模式的核心思想是:一个生产者并不会直接往一个队列中发送消息,事实上,生产者根本不知道它发送的消息将被转发到哪些队列。

实际上,生产者只能把消息发送给一个exchange,exchange只做一件简单的事情:一方面它们接收从生产者发送过来的消息,另一方面,把接收到的消息推送给队列。

一个exchage必须清楚地知道如何处理一条消息。

  

有四种类型的交换器,分别是:direct、topic、headers、fanout。主要讲解最后一种:fanous(广播模式)。下面创建一个fanout类型的交换器,将创建的交换机命名为logs,类型是fanout:

channel.exchangeDeclare("logs", "fanout");

广播模式交换器很简单,从字面意思也能理解,就是把接收到的消息推送给所有它知道的队列。在我们的日志系统中正好需要这种模式。

如果想查看当前系统中有多少个exchange,可以使用以下命令:

sudo rabbitmqctl list_exchanges

  

或者通过控制台查看(Exchanges标签下):

可以看到有很多以amq.*开头的交换器,以及(AMQP default)默认交换器,这些是默认创建的交换器。

在前面工作队列的指南中,我们并未显式的使用交换器(指定交换器为字符串""),但是依然可以将消息发送到队列中,其实并不是我们没有使用交换器,实际上是我们使用的是默认交换器,回顾一下我们之前是如何发送消息的:

channel.basicPublish("", "hello", null, message.getBytes());

第一个参数是交换器的名字,空字符串表示它是一个默认或无命名的交换器,消息将会由指定的路由键(第二个参数,routingKey,后面会讲)转发到队列。

既然exchange可以指定为空字符串(""),那么可否指定为null?答案是:不能!

在AMQImpl类中的Publish()方法中,不光是exchange不能为null,routingKey路由键也不能为null,否则会抛出异常:

现在,可以显式的使用刚创建的交换器:

channel.basicPublish( "logs", "", null, message.getBytes());

临时队列

前面我们使用的队列都是有具体的队列名(hello),创建命名队列是很必要的,因为我们需要将消费者指向同一名字的队列。因此,要想在生产者和消费者中间共享队列就必须要使用命名队列。

但是,现在讲解的日志系统也可以使用非命名队列(可以不手动命名),我们希望收到所有日志消息,并且我们希望总是接收到新的日志消息而不是旧的日志消息。为了解决这个问题,需要分两步走。

首先,无论何时当消费者连接到RabbitMq,都需要一个新的、空的队列来接收日志消息,因此,消费者在连接上RabbitMq之后需要创建一个任意名字的队列或者让RabbitMq生成一个任意的队列名字。

其次,一旦该消费者断开了与RabbitMq的连接,队列也被自动删除。

使用无参的queueDeclare(),就可以创建一个非持久化、专有的、自动删除且名字随机生成的队列。

String queueName = channel.queueDeclare().getQueue();

绑定(Binding)

  

当广播模式的交换器和队列已经创建好了,接下来就是要告诉交换器向队列里发送消息。交换器与队列之间的关系称之为绑定关系

channel.queueBind(queueName, "logs", "");

queueBind的第三个参数是routingkey。

至此,交换器已经可以往队列中发送消息了。

可以通过下列命令来查看队列的绑定关系:

rabbitmqctl list_bindings

完整的代码  

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel();) {

            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

            String message = "RabbitMq fanout......";
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("utf-8"));

            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

Connection创建完成之后,定义了exchange,这一步是必要的,因为如果没有交换器将无法发送消息。如果没有队列绑定到该交换器上,那么,交换器收到的消息将会丢失掉,但是对本章的日志系统来说没问题的,当没有消费者时,就完全的放弃掉数据,消费者连接上时,只接收最新的日志消息就好。

public class ReceiveLogs {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

        final String queue = channel.queueDeclare().getQueue();
        channel.queueBind(queue,EXCHANGE_NAME,"");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTa,delivery) -> {

            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");

        };

        channel.basicConsume(queue,true,deliverCallback,consumerTag -> {});
    }
}

basicConsume的autoAck设置为true,因为现在是广播模式,每个消费者都会收到一样的消息,并且这里给消费者生产的随机名称的队列相当于是独有的,所以在接收到消息之后立即发送确认回执是OK的。

现在已经可以把消息广播给很多的消费者。接下来我们将增加一个特性:订阅这些信息中的一些信息。例如,只将error级别的错误存储到硬盘中,同时可以将所有级别(error、info、warning等)的日志都打印在控制台上。

绑定(Bindings)

回顾一下创建绑定关系的代码:

channel.queueBind(queueName, EXCHANGE_NAME, "");

一个绑定是一个交换器与队列之间的关系。意思是指:这个队列对这个交换器的消息感兴趣。

该方法同时还有另一个routing Key参数(第三个参数),为了避免与basicPublish参数中的路由键(routing key)混淆,我们称之为绑定键(bingind key),下面展示了如何通过一个绑定key创建一个绑定:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

绑定键(这里是"black")的含义依赖于交换器的类型。在我们的日志系统中,交换器类型是fanout,绑定键没有任何意义,会被忽略掉。

直连交换机(Direct Exchange)

我们正在用的广播模式的交换器并不够灵活,它只是不加思索地进行广播。现在使用direct exchange来代替。直连交换器的路由算法非常简单:将消息推送到binding key与该消息的routing key相同的队列。

请看下图:

  

在该图中,直连交换器X上绑定了两个队列。第一个队列的绑定键orange,第二个队列有两个绑定键:black和green。在这种场景下,一个消息在发布时(basicPublish)指定的路由键若为orange,则该消息将只被路由到队列Q1,若路由键为black或green的消息,将只被路由到队列Q2。其他的消息都将被丢失。

多重绑定

  

同一个绑定键可以绑定到不同的队列上去,在上图中,交换器X与队列Q2的绑定键也是black,在这种情况下,直连交换器将会和广播交换器有着相同的行为,将消息推送到所有匹配的队列。一个路由键为black的消息将会同时被推送到队列Q1和Q2。

发送日志的代码片段

//一如既往的先创建一个交换器:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//发送消息:"severity"参数是日志系统中“info”、“warning”和“error”中的一个。
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

接收消息的代码片段与之前的基本相同,只是需要在创建绑定关系时,指定severity的值(也就是绑定值):

String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){
   channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

完整的代码

  EmitLogDirect.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    try (Connection connection = factory.newConnection();
         Channel channel = connection.createChannel()) {
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");

        String severity = getSeverity(argv);
        String message = getMessage(argv);

        channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
    }
  }
  //..
}

发送者发送消息的routingKey和消息都来自于命令行传递过来的argv参数中。

ReceiveLogsDirect.java

import com.rabbitmq.client.*;

public class ReceiveLogsDirect {

  private static final String EXCHANGE_NAME = "direct_logs";

  public static void main(String[] argv) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    String queueName = channel.queueDeclare().getQueue();

    if (argv.length < 1) {
        System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
        System.exit(1);
    }

    for (String severity : argv) {
        channel.queueBind(queueName, EXCHANGE_NAME, severity);
    }
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] Received '" +
            delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
    };
    channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  }
}

消费者接收消息时,queueBind的bindingKey也要来自于命令行的argv参数。

用direct交换器替换了fanout交换器,使得我们可以有选择性地接收消息。尽管如此,仍然还有限制:不能基于多个标准进行路由。

在日志系统中,我们可能不仅希望根据日志等级订阅日志,还希望根据日志来源订阅日志。这个概念来自于unix工具syslog,它不仅可以根据日志等级(info/warn/crit...)来路由日志,同时还可以根据设备(auth/cron/kern...)来路由日志,这将更加灵活。我们可能希望只监听来自'cron'的error级别日志,同时又要接收来自'kern'的所有级别的日志。我们的日志系统如果要实现这个功能,就需要使用到另外一种交换器:主题交换器(Topic Exchange)。

主题交换器(Topic Exchange)

发送到主题交换器的消息的routing key必须是由点号分开的一串单词,这些单词可以是任意的,但通常是与消息相关的一些特征。比如以下是几个有效的routing key: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit",routing key的单词可以有很多,最大限制是255 bytes。

binding key必须与routing key模式一样。Topic交换器的逻辑与direct交换器有点相似:使用特定路由键发送的消息将被发送到所有使用匹配的绑定键绑定的队列。然而,绑定键有两个特殊的情况,如下:

  • 表示匹配任意一个单词
  • # 表示匹配任意一个或多个单词

下图表示了这这两个通配符的用法:

  

在这个例子中,我们将发送所有跟动物有关的消息,这些消息将会发送到由三个单词,两个点号组成的routing key,第一个单词了表示的是速度,第二个单词表示颜色,第三个单词表示种类:

  "<speed>.<colour>.<species>"。

  我们创建三个绑定关系:队列Q1绑定到绑定键*.orange.* ,队列Q2绑定到*.*.rabbit和lazy.#。

  总结下来就是:

  • 队列Q1对橘黄色(orange)颜色的所有动物感兴趣;
  • 队列Q2对所有的兔子(rabbit)和所有慢吞吞(lazy)的动物感兴趣。

一个路由为 "quick.orange.rabbit"的消息,将会被转发到这两个队列,路由为"lazy.orange.elephant"的消息也被转发给这两个队列,路由为 "quick.orange.fox"的消息将只被转发到Q1队列,路由为 "lazy.brown.fox"的消息将只被转发到Q2队列。"lazy.pink.rabbit" 只被转发到Q2队列一次(虽然它匹配绑定键*.*.rabbit和lazy.#),路由为 "quick.brown.fox"的消息与任何一个绑定键都不匹配,因此将会被丢弃。

如果我们发送的消息的的路由是由一个单词“orangle"或4个单词”quick.orangle.male.rabbit“将会怎样?会因为与任何一个绑定键不匹配而被丢弃。若路由为 "lazy.orange.male.rabbit"的消息,因为匹配"lazy.#"绑定键,因而会被转发到Q2队列。

Topic交换器非常强大,可以像其他类型的交换器一样工作:当一个队列的绑定键是"#"是,它将会接收所有的消息,而不再考虑所接收消息的路由键,就像是fanout交换器一样;当一个队列的绑定键没有用到”#“和”*“时,它又像direct交换一样工作。

2、完整的代码

  下面是在我们日志系统中采用Topic交换器的完整代码,日志消息的路由由两个单词组成:"<facility>.<severity>"。

EmitLogTopic.java

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic {

    private final static String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost);

        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()) {

            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

            String message = "A critical kernel error";
            String routingKey = "kern.critical";

            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("utf-8"));

            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
        }
    }
}

ReceiveLogsTopic.java

import com.rabbitmq.client.*;

public class ReceiveLogsTopic {

    private final static String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        String queueName = channel.queueDeclare().getQueue();

        if (args.length < 1) {
            System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
            System.exit(1);
        }

        for (String bindingKey : args) {
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/75711.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

怎样让音频速度变慢?请跟随以下方法进行操作

怎样让音频速度变慢&#xff1f;在会议录音过程中&#xff0c;经常会遇到主讲人语速过快&#xff0c;导致我们无法清晰听到对方说的内容。如果我们能够减慢音频速度&#xff0c;就能更好地记录对方的讲话内容。此外&#xff0c;在听到快速播放的外语或方言时&#xff0c;我们也…

YOLOv5基础知识入门(3)— 目标检测相关知识点

前言&#xff1a;Hello大家好&#xff0c;我是小哥谈。YOLO算法发展历程和YOLOv5核心基础知识学习完成之后&#xff0c;接下来我们就需要学习目标检测相关知识了。为了让大家后面可以顺利地用YOLOv5进行目标检测实战&#xff0c;本节课就带领大家学习一下目标检测的基础知识点&…

MySQL入门学习教程(一)

mysql简介 1、什么是数据库 &#xff1f; 数据库&#xff08;Database&#xff09;是按照数据结构来组织、存储和管理数据的仓库&#xff0c;它产生于距今六十多年前&#xff0c;随着信息技术和市场的发展&#xff0c;特别是二十世纪九十年代以后&#xff0c;数据管理不再仅仅…

【数据库系统】-- 【1】DBMS概述

1.DBMS概述 01数据库系统概述02数据库技术发展概述03关系数据库概述04数据库基准测试 01数据库系统概述 几个基本概念 为什么使用数据库系统 数据库发展的辉煌历程 02数据库技术发展概述 数据模型 应用领域 ● OLTP ● OLAP ● HTAP ● GIS OLTP与OLAP 与其他技术相…

代码随想录算法训练营(二叉树总结篇)

一.二叉树的种类 1.满二叉树&#xff1a;就是说每一个非叶子节点的节点都有两个子节点。 2.完全二叉树&#xff1a;此二叉树只有最后一层可能没填满&#xff0c;并且存在的叶子节点都集中在左侧&#xff01;&#xff01;&#xff01; &#xff08;满二叉树也是完全二叉树&…

【UE4 RTS】09-Day and Night

前言 本篇博客实现的效果是太阳和天空会随着游戏时间的变化而变化。 效果 步骤 1. 设置“LightSource”为可移动的 2. 新建一个文件夹&#xff0c;命名为“Lighting” 3. 打开游戏状态“RTS_GameState_BP”&#xff0c;添加一个函数命名为“GetGameSpeed” 添加一个浮点类型…

bilibili倍数脚本,油猴脚本

一. 内容简介 bilibili倍数脚本&#xff0c;油猴脚本 二. 软件环境 2.1 Tampermonkey 三.主要流程 3.1 创建javascript脚本 点击添加新脚本 就是在 (function() {use strict;// 在这编写自己的脚本 })();倍数脚本&#xff0c;含解析 // UserScript // name bi…

TikTok带货有什么优势?品牌营销的新趋势

在当今数字化时代&#xff0c;品牌营销正日益倾向于社交媒体平台&#xff0c;而TikTok作为一款全球热门的短视频社交平台&#xff0c;正在成为品牌营销的新趋势。TikTok带货&#xff0c;也就是品牌利用TikTok平台进行商品推广和销售&#xff0c;已成为一种创新的、高效的营销方…

GPT-4 如何为我编写测试

ChatGPT — 每个人都在谈论它,每个人都有自己的观点,玩起来很有趣,但我们不是在这里玩— 我想展示一些实际用途,可以帮助您节省时间并提高效率。 我在本文中使用GPT-4 动机 我们以前都见过这样的情况——代码覆盖率不断下降的项目——部署起来越来越可怕,而且像朝鲜一样…

【深入理解ES6】块级作用域绑定

1. var声明及变量提升机制 提升&#xff08;Hoisting&#xff09;机制&#xff1a;通过关键字var声明的变量&#xff0c;都会被当成在当前作用域顶部生命的变量。 function getValue(condition){if(condition){var value "blue";console.log(value);}else{// 此处…

Jetpack之MutableLiveData和LiveData源码分析

先看一下MutableLiveData的源码&#xff0c;它是继承于LiveData,主要是重写了setValue和postValue方法。 上图我们知道这两个方法都是调用了livedata的各自对应的方法&#xff0c;我们点进去看看livedata的这两个方法是protect 的 允许子类和自己调用&#xff0c;而MutableLiv…

Qt在mac安装

先在app store下载好Xcode 打开Xcode 随便建个文件给它取个名字找个地方放提醒没建立git link,不用理他打开终端, 输入/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"

SpringMVC学习笔记

springMVC简单入门 快速搭建 pom.xml依赖 <!-- 导入坐标 springmvc 与 servlet --><dependencies><dependency><groupId>javax.servlet</groupId><artifactId>javax.servlet-api</artifactId><version>3.1.0</version>…

回归预测 | MATLAB实现GRNN广义回归神经网络多输入多输出预测

回归预测 | MATLAB实现GRNN广义回归神经网络多输入多输出预测 目录 回归预测 | MATLAB实现GRNN广义回归神经网络多输入多输出预测预测效果基本介绍程序设计往期精彩参考资料 预测效果 基本介绍 MATLAB实现GRNN广义回归神经网络多输入多输出预测&#xff0c;输入10个特征&#x…

re学习(32)【绿城杯2021】babyvxworks(浅谈花指令)

链接&#xff1a;https://pan.baidu.com/s/1msA5EY_7hoYGBEema7nWwA 提取码&#xff1a;b9xf wp:首先找不到main函数&#xff0c;然后寻找特殊字符串&#xff0c; 交叉引用 反汇编 主函数在sub_3D9当中&#xff0c;但是IDA分析错了 分析错误后&#xff0c;删除函数 创建函数 操…

从零构建深度学习推理框架-8 卷积算子实现

其实这一次课还蛮好理解的&#xff1a; 首先将kernel展平&#xff1a; for (uint32_t g 0; g < groups; g) {std::vector<arma::fmat> kernel_matrix_arr(kernel_count_group);arma::fmat kernel_matrix_c(1, row_len * input_c_group);for (uint32_t k 0; k < k…

UI美工设计岗位的基本职责概述(合集)

UI美工设计岗位的基本职责概述1 1、有良好的美术功底、设计新颖&#xff0c;整体配色及设计创意理念&#xff0c;能够独立完成整个网站页面设计及制作; 2、熟练运用DIV CSS&#xff0c;HTML 设计制作网页 ; 3、熟练运用Photoshop,Dreamweaver,Coreldraw(或Illustrator),Fla…

LabVIEW控制通用工作台

LabVIEW控制通用工作台 用于教育目的的计算机化实验室显着增长&#xff0c;特别是用于运动控制的实验室。它们代表了各种工业应用中不断扩大的领域&#xff0c;并成为以安全的方式使用通常昂贵或独特的实验室设备进行实时实验的宝贵工具。NI LabVIEW等软件应用程序的开发和不断…

【数据结构OJ题】反转链表

原题链接&#xff1a;https://leetcode.cn/problems/reverse-linked-list/description/ 目录 1. 题目描述 2. 思路分析 3. 代码实现 1. 题目描述 2. 思路分析 方法一&#xff1a;三指针翻转法 使用三个结构体指针n1&#xff0c;n2&#xff0c;n3&#xff0c;原地修改结点…

开发过程中遇到的问题以及解决方法

巩固基础&#xff0c;砥砺前行 。 只有不断重复&#xff0c;才能做到超越自己。 能坚持把简单的事情做到极致&#xff0c;也是不容易的。 开发过程中遇到的问题以及解决方法 简单易用的git命令 git命令&#xff1a; 查看有几个分支&#xff1a;git branch -a 切换分支&#…