批量收发实战
发送消息是需要网络连接的如果我们单条发送吞吐量可能没有批量发送好。剖来那个发送可以减少网络IO开销,但是也不能一批次发送太多的数据,需要根据每条消息的大小和网络带宽来确定量的数目。
比如网络带宽为可以支持一次性发送8M的数据包,如果数据包确定不会超过8M,那么我们可以除以每条消息的大小(粗略估算),然后会得到一个数值,这个数值再取70%-80%留一定的缓冲空间。
如果我们一次性发送的数据超过了8M,就需要对这些消息进行分组发送,保证每一组的数据大小不超过8M,每一组发送的数量逻辑也是按照前面这样来计算。
生产者
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("ip:9876");
producer.start();
List<Order> F = OrderBuilder.build(1, "A", "B", "C");
List<Order> S = OrderBuilder.build(2, "D", "Q");
List<Order> T = OrderBuilder.build(3, "N", "Q", "R");
ArrayList<Order> orders = new ArrayList<Order>() {{
addAll(F);
addAll(S);
addAll(T);
}};
List<Message> msgs = new ArrayList<>();
for (Order order : orders) {
Message msg = new Message("test-topic", "test-topic_str", order.toString().getBytes());
msg.setKeys("test-topic_trace");
msgs.add(msg);
}
producer.send(msgs);
}
}
消费者1
public class Consumer {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
consumer.setNamesrvAddr("ip:9876");
consumer.subscribe("test-topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List< MessageExt > msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
可以看到批量消费的时候没有保证顺序:
消费者2
public class Consumer2 {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-group");
consumer.setNamesrvAddr("ip:9876");
consumer.subscribe("test-topic", "*");
// 使用顺序的方式来消费MessageListenerOrderly
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {
System.out.println(String.format("msg {%s} recvTime %s", new String(msg.getBody()), new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
}
}
消费的时候没有产生顺序问题,完全是按照批量发送的顺序: