创建延时队列
queue.file_delay_destroy
x-dead-letter-exchange: exchange.file_delay_destroy
x-message-ttl: 259200000
259200000为3天,1000为1秒
创建普通队列
queue.file_destroy
创建普通交换机
exchange.file_delay_destroy
type选择fanout
交换机绑定普通队列
(图中已经绑定,红框为绑定过程)
普通队列绑定交换机
(图中已经绑定,红框为绑定过程)
延时队列
springboot配置多个rabbitmq
延时队列时间到之后,将消息发送给queue.file_destroy,执行删除文件操作
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import lombok.Data;
import lombok.EqualsAndHashCode;
import java.io.Serializable;
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonIgnoreProperties(ignoreUnknown = true)
@EqualsAndHashCode
@Data
public class MQMessage implements Serializable {
private JSONObject msg;
private String messageId; //存储消息发送的唯一标识
}
import com.sxqx.entity.MQMessage;
public interface MQMessageSender {
/**
*
* @param queue 消息队列名称
* @param msg 消息
*/
void send(String queue, MQMessage msg);
}
RabbitConfig配置类
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
public class RabbitConfig {
@Primary
@Bean(name="mq1ConnectionFactory")
public ConnectionFactory mq1ConnectionFactory(
@Value("${spring.rabbitmq.mq1.host}") String host,
@Value("${spring.rabbitmq.mq1.port}") int port,
@Value("${spring.rabbitmq.mq1.username}") String username,
@Value("${spring.rabbitmq.mq1.password}") String password,
@Value("${spring.rabbitmq.mq1.virtual-host}") String virtualHost
){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
@Bean(name="mq2ConnectionFactory")
public ConnectionFactory mq2ConnectionFactory(
@Value("${spring.rabbitmq.mq2.host}") String host,
@Value("${spring.rabbitmq.mq2.port}") int port,
@Value("${spring.rabbitmq.mq2.username}") String username,
@Value("${spring.rabbitmq.mq2.password}") String password,
@Value("${spring.rabbitmq.mq2.virtual-host}") String virtualHost
){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
@Primary
@Bean(name="mq1RabbitTemplate")
public RabbitTemplate mq1RabbitTemplate(
@Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory
){
RabbitTemplate mq1RabbitTemplate = new RabbitTemplate(connectionFactory);
mq1RabbitTemplate.setMessageConverter(jsonMessageConverter());
return mq1RabbitTemplate;
}
@Bean(name="mq2RabbitTemplate")
public RabbitTemplate mq2RabbitTemplate(
@Qualifier("mq2ConnectionFactory") ConnectionFactory connectionFactory
){
RabbitTemplate mq2RabbitTemplate = new RabbitTemplate(connectionFactory);
mq2RabbitTemplate.setMessageConverter(jsonMessageConverter());
return mq2RabbitTemplate;
}
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean(name = "mq1Factory")
@Primary
public SimpleRabbitListenerContainerFactory mq1Factory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean(name = "mq2Factory")
public SimpleRabbitListenerContainerFactory mq2Factory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("mq2ConnectionFactory") ConnectionFactory connectionFactory
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
}
mq1
@Component
public class RabbitMQ1MessageSender implements MQMessageSender, RabbitTemplate.ConfirmCallback{
@Resource(name = "mq1RabbitTemplate")
private RabbitTemplate mq1RabbitTemplate;
Log log = LogFactory.getLog(RabbitMQ1MessageSender.class);
@Autowired
public RabbitMQ1MessageSender() {
}
@PostConstruct
public void init(){
mq1RabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(!ack){
log.error("消息接收失败" + cause);
// 我们这里要做一些消息补发的措施
System.out.println("id="+correlationData.getId());
}
}
@Override
public void send(String routingKey, MQMessage msg) {
String jsonString = JsonConverter.bean2Json(msg);
if (jsonString != null) {
mq1RabbitTemplate.convertAndSend(routingKey, jsonString);
}
}
}
mq2
import com.sxqx.entity.MQMessage;
import com.sxqx.utils.dataConverter.JsonConverter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Component
public class RabbitMQ2MessageSender implements MQMessageSender, RabbitTemplate.ConfirmCallback{
@Resource(name = "mq2RabbitTemplate")
private RabbitTemplate mq2RabbitTemplate;
Log log = LogFactory.getLog(RabbitMQ2MessageSender.class);
@Autowired
public RabbitMQ2MessageSender() {
}
@PostConstruct
public void init(){
mq2RabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(!ack){
log.error("消息接收失败" + cause);
// 我们这里要做一些消息补发的措施
System.out.println("id="+correlationData.getId());
}
}
@Override
public void send(String routingKey, MQMessage msg) {
String jsonString = JsonConverter.bean2Json(msg);
if (jsonString != null) {
mq2RabbitTemplate.convertAndSend(routingKey, jsonString);
}
}
}
application-prod.yaml
spring:
rabbitmq:
mq1:
username: guest
password: guest
host: mq1_ip
port: 5672
virtual-host: /
publisher-returns: true
publisher-confirm-type: simple
listener:
simple:
acknowledge-mode: auto # 手动应答
prefetch: 10 #每次从队列中取一个,轮询分发,默认是公平分发
retry:
max-attempts: 5 # 重试次数
enabled: true # 开启重试
concurrency: 5
max-concurrency: 10
mq2:
username: guest
password: guest
host: mq2_ip
port: 5672
virtual-host: /
publisher-returns: true
publisher-confirm-type: simple
listener:
simple:
acknowledge-mode: auto # 手动应答
prefetch: 10 #每次从队列中取一个,轮询分发,默认是公平分发
retry:
max-attempts: 5 # 重试次数
enabled: true # 开启重试
concurrency: 5
max-concurrency: 10
mq1消费端,发消息给mq2
@Component
public class SyncWeatherLivePicMessageReceiver implements IMessageReceiver {
private final IWeatherLivePicMapper weatherLivePicMapper;
private final RabbitMQ2MessageSender rabbitMQ2MessageSender;
@Autowired
public SyncWeatherLivePicMessageReceiver(IWeatherLivePicMapper weatherLivePicMapper,
RabbitMQ2MessageSender rabbitMQ2MessageSender) {
this.weatherLivePicMapper = weatherLivePicMapper;
this.rabbitMQ2MessageSender = rabbitMQ2MessageSender;
}
Log log = LogFactory.getLog(SyncWeatherLivePicMessageReceiver.class);
private FtpHelper ftpHelperIns1;
private FtpHelper ftpHelperIns2;
private FTPFileFilter ftpFileFilter;
@RabbitListener(queuesToDeclare = {
@Queue(name = "sxqxgzbgxw_weather_live_pic")
})
@RabbitHandler
@Override
public void onMessageReceived(String mqMessageString) {
JsonNode jsonNode = JsonConverter.jsonString2JsonNode(mqMessageString);
JsonNode msg = jsonNode.findValue("msg");
JsonNode JsonNodeParams = msg.findValue("params");
Map<String, Object> params = JsonConverter.jsonNode2HashMap(JsonNodeParams);
if (params.size() > 0) {
String times = params.get("times").toString();
String serverFrom = params.get("serverFrom").toString();
......
// 清除数据
MQMessage mqMessage = new MQMessage();
JSONObject jsonObject = new JSONObject();
jsonObject.put("filePath", "/data/static/dataSharingStatic/weatherlive/colorFigure/png/610000/610000/"+ times);
mqMessage.setMsg(jsonObject);
rabbitMQ2MessageSender.send("queue.file_delay_destroy", mqMessage);
// 清除DB记录
MQMessage mqMessage2 = new MQMessage();
JSONObject jsonObject2 = new JSONObject();
jsonObject2.put("tableName", "WEATHER_LIVE_PIC");
jsonObject2.put("picUrl", "http://外网ip/dataSharingStatic/weatherlive/colorFigure/png/610000/610000/"+ times +"/"+ type + "/" + "gjzjqyz/"+ fileNameFrom);
mqMessage2.setMsg(jsonObject);
rabbitMQ2MessageSender.send("queue.db_delay_destroy", mqMessage2);
}
}
}
mq2消费端用于递归删除文件
import com.fasterxml.jackson.databind.JsonNode;
import com.sxqx.listener.IMessageReceiver;
import com.sxqx.utils.dataConverter.JsonConverter;
import com.sxqx.utils.file.FileHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.File;
@Component
public class FileDestroyMessageReceiver implements IMessageReceiver {
Log log = LogFactory.getLog(FileDestroyMessageReceiver.class);
@RabbitListener(queuesToDeclare = {
@Queue(name = "queue.file_destroy")
})
@RabbitHandler
@Override
public void onMessageReceived(String mqMessageString) {
JsonNode jsonNode = JsonConverter.jsonString2JsonNode(mqMessageString);
JsonNode msg = jsonNode.findValue("msg");
String filePath = msg.findValue("filePath").asText();
if (filePath.contains("/data/static/dataSharingStatic/weatherlive/colorFigure/png/610000/610000")) {
File file = new File(filePath);
if (file.exists()) {
FileHelper.deleteFile(file);
}
} else {
log.info("有人想删除设定之外的文件");
log.info(filePath);
}
}
}
FileHelper工具类递归删除文件或文件夹
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.net.URLConnection;
public class FileHelper {
/**
* 递归删除文件或文件夹
* @param directory
*/
public static void deleteFile(File directory) {
if (!directory.exists()) {
return;
}
File[] files = directory.listFiles();
if (files!=null) {//如果包含文件进行删除操作
for (File value : files) {
if (value.isFile()) {
//删除子文件
value.delete();
} else if (value.isDirectory()) {
//通过递归的方法找到子目录的文件
deleteFile(value);
}
value.delete();//删除子目录
}
}
directory.delete();
}
}
mq2消费端用于删除数据库数据
import com.fasterxml.jackson.databind.JsonNode;
import com.sxqx.listener.IMessageReceiver;
import com.sxqx.mapper.remote.xugugzb.weatherlivepic.IWeatherLivePicMapper;
import com.sxqx.utils.dataConverter.JsonConverter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Objects;
@Component
public class DBDestroyMessageReceiver implements IMessageReceiver {
private final IWeatherLivePicMapper weatherLivePicMapper;
@Autowired
public DBDestroyMessageReceiver(IWeatherLivePicMapper weatherLivePicMapper){
this.weatherLivePicMapper = weatherLivePicMapper;
}
Log log = LogFactory.getLog(DBDestroyMessageReceiver.class);
@RabbitListener(queuesToDeclare = {
@Queue(name = "queue.db_destroy")
})
@RabbitHandler
@Override
public void onMessageReceived(String mqMessageString) {
JsonNode jsonNode = JsonConverter.jsonString2JsonNode(mqMessageString);
JsonNode msg = jsonNode.findValue("msg");
String tableName = msg.findValue("tableName").asText();
String picUrl = msg.findValue("picUrl").asText();
if (picUrl.contains("/dataSharingStatic/weatherlive/colorFigure/png/610000/610000/")) {
if (Objects.equals("WEATHER_LIVE_PIC",tableName)) {
weatherLivePicMapper.deleteWeatherLivePic(picUrl);
}
} else {
log.info("有人想删除设定之外的数据");
log.info(picUrl);
}
}
}
nginx.conf
上传静态资源至linux path,配置nginx.conf,使浏览器可以直接访问静态资源
server {
listen 80;
server_name 60.204.202.112;
add_header Cache-Control no-store;
charset utf-8;
location / {
root /mnt/sxqxgxw-gzb-front/dist/;
try_files $uri $uri/ /index.html;
index index.html index.htm;
}
#访问/dataSharingStatic时,相当于访问/data/static/dataSharingStatic路径下资源
location /dataSharingStatic {
root /data/static;
autoindex on;
add_header Access-Control-Allow-Origin *;
add_header Access-Control-Allow-Headers X-Requestd-With;
add_header Access-Control-Allow-Methods GET,POST,OPTIONS;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
location /api/ {
proxy_pass http://60.204.202.112:8896;
rewrite ^/api/(.*)$ /$1 break;
add_header Access-Control-Allow-Origin *;
add_header Access-Control-Allow-Headers X-Requestd-With;
add_header Access-Control-Allow-Methods GET,POST,OPTIONS;
}
}