之前伙伴对接G7物流平台获取温度、轨迹数据,写的一塌糊涂,今天来重新对接下。
G7易流
G7物联和易流科技合并后正式发布的品牌,主要面向生产制造与消费物流行业的货主及货运经营者提供软硬一体、全链贯通的SaaS服务。这包括订阅服务(车队管理、安全管理)与交易服务(数字货运、数字能源、智能装备、物联保险),旨在为中小货运经营者提供智能一体化经营平台,同时为大型货主提供行业级解决方案。G7易流的品牌应用范围广泛,包括财运通、智能管车、数字货仓、数字能源、数字保险、端到端可视、运力供应链等。
截至2023年5月,G7易流在物流数字化领域服务的客户覆盖了全国超过80%的上游货主企业和4万多家货运经营者,服务超过10年的客户有821个,超过5年的有10779个。
G7易流还提供开放的物流IoT平台,共建物流数据共享生态系统。平台提供多样化的API和推送服务,致力于为用户带来稳定、安全、易用的体验。此外,G7易流还提供了数字货运服务,通过AIoT技术连接百万车辆,全国覆盖,帮助货主和卡车运力完成智能匹配。
G7易流网络货运平台贯通物流运输全流程,提供运力沉淀、智能接单、在途可视、回单上传、运费秒结、熟车信息全部线上登记等功能。平台操作简单快捷,自动识别出发到达,司机操作简单,无缝连接,无需重复录单,更方便。G7易流还连接油卡、保险、金融,确保业务数据安全。
G7易流总部位于北京,研发中心在成都,在上海、广州分别设有解决方案中心,业务覆盖全国及周边亚洲国家。G7平台上服务客户数量超过7万家,连接车辆总数超过180万辆,客户类型覆盖快递快运、电商、危化品运输、冷链物流、汽车物流、大宗运输、城市配送、货主等物流全领域。G7以智能终端为基础,用数据连接每一辆卡车、货主、运力主和司机,提升运输服务效率。
引入依赖
<!-- mqtt -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)是一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,由IBM在1999年发布。MQTT的特点包括简单易实现、支持QoS(服务质量)、报文精简以及基于TCP/IP。它适用于机器与机器(M2M)通信和物联网(IoT)场景,尤其是在带宽有限或网络不稳定的环境中表现优异。
MQTT协议的核心特性包括轻量级、低功耗、高可靠性、易于实现和支持多种QoS级别。它的工作原理基于客户端、服务器(Broker)和主题(Topic)的概念。客户端可以发布消息到特定主题,也可以订阅主题以接收消息。服务器负责接收、存储和分发消息,维护客户端的连接状态。
在安全性方面,MQTT支持通过TLS/SSL加密连接,以及使用用户名和密码进行身份验证。此外,Broker可以配置访问控制列表(ACL),以限制客户端对特定主题的访问。
MQTT的应用场景非常广泛,包括物联网(IoT)、实时数据流、远程监控、智能家居、工业自动化、智慧城市、农业物联网(AIoT)、车联网、能源管理、远程医疗、物流追踪和环境监测等。
实现MQTT通常涉及选择Broker、开发客户端应用、连接和通信以及考虑安全性问题。开源的MQTT
Broker如Mosquitto,或云服务提供的Broker如AWS IoT、Azure IoT
Hub等,都是常见的选择。开发者可以使用各种编程语言提供的MQTT库来开发客户端应用。
自己去G7平台的用户信息里面查询吧
g7.mqtt.accessId = 111
# 密码
g7.mqtt.secret = 111
# 主题
g7.mqtt.topic = 111
# mqtt服务器地址
g7.mqtt.broker = 111
g7.mqtt.clientId = 111
# 设置超时时间 单位为秒
g7.mqtt.timeout=20
# 设置会话心跳时间 单位为秒
g7.mqtt.keepAliveInterval=50
g7,MQTT客户端
/**
* g7,MQTT客户端
* 包含固定式冷链数据、便携式冷链数据
* <p>
* 开发完提交代码,如果不上生产的话可以注释,防止本地环境启动的时候把服务器上停止
* <p>
* author py.sun
*/
@Component
@Slf4j
public class G7ClientMQTTConfig {
@Value("${g7.mqtt.accessId}")
private String user;
@Value("${g7.mqtt.secret}")
private String passWord;
@Value("${g7.mqtt.topic}")
private String topic;
@Value("${g7.mqtt.broker}")
public String broker;
@Value("${g7.mqtt.clientId}")
private String clientId;
@Value("${g7.mqtt.timeout}")
private int timeout;
@Value("${g7.mqtt.keepAliveInterval}")
private int keepAliveInterval;
@Resource
private G7VehicleDataCallBack g7VehicleDataCallBack;
//qos为1或2时,mqttClient使用
MemoryPersistence persistence = new MemoryPersistence();
// qos2消耗较大,请使用1或0
int qos = 1;
@Bean
@Async
public MqttClient mqttClient() throws MqttException {
// 客户端同步请求
MqttClient client = new MqttClient(broker, clientId, persistence);
try {
//创建连接
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(user);
connOpts.setPassword(passWord.toCharArray());
//cleanSession为false时,下次以相同clientId登录将可以获取存储的所有消息 如果为true,将获取到retained标记的最后一条消息
//cleanSession调试测试阶段设置为true,生产设置为false
connOpts.setCleanSession(false);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
connOpts.setKeepAliveInterval(keepAliveInterval);
// 设置超时时间 单位为秒
connOpts.setConnectionTimeout(timeout);
// 设置自动重连
connOpts.setAutomaticReconnect(true);
// 设置回调 获取订阅消息
client.setCallback(g7VehicleDataCallBack);
client.connect(connOpts);
// 订阅topic,可以订阅不同的主题
client.subscribe(topic, qos);
} catch (MqttException e) {
log.error("MQTT连接异常=", e);
}
return client;
}
}
接收G7的回传数据类
@Component
@Slf4j
public class G7VehicleDataCallBack implements MqttCallback {
@Value("${g7.mqtt.broker}")
public String broker;
@Value("${g7.mqtt.clientId}")
public String clientId;
@Value("${g7.mqtt.topic}")
public String clientTopic;
/**
* 连接断开重试
* param throwable
*/
@SneakyThrows
@Override
@Async
public void connectionLost(Throwable throwable) {
log.error("mqtt connectionLost 连接断开,5S之后尝试重连=", throwable);
try {
MqttClient client = SpringUtils.getBean("mqttClient");
if (!client.isConnected()) {
client.reconnect();
}
// 重连之后重新订阅
client.subscribe(clientTopic, 1);
TimeUnit.SECONDS.sleep(5);
} catch (MqttException e) {
MessageWarningUtils.sendMessage("重新连接mqtt异常=" + e.getMessage(), config.getToken(), config.getSecretKey());
log.error("重新连接mqtt异常=", e);
}
}
/**
* message的处理请放入其他线程中,在该方法中消耗时间过多将影响qos为1或2的响应,使服务器认为未成功投递消息
* subscribe后会执行到这里
*
* @param topic
* @param mqttMessage
*/
@Override
@LogTraceId
public void messageArrived(String topic, MqttMessage mqttMessage) {
/**
* message的处理请放入其他线程中,在该方法中消耗时间过多将影响qos为1或2的响应,使服务器认为未成功投递消息
* subscribe后会执行到这里
*
* @param topic
* @param mqttMessage
*/
@Override
@LogTraceId
public void messageArrived(String topic, MqttMessage mqttMessage) {
log.info("温度轨迹数据topic={},原始消息Id={}", topic, mqttMessage.getId());
try {
if (mqttMessage.getPayload() != null && mqttMessage.getPayload().length > 0) {
// 发布消息主题(这个就是你想要的数据,自己去解析吧)
JSONObject baseRequestVo = JSON.parseObject(new String(mqttMessage.getPayload()));
}
} catch (CustomException e2) {
log.error("推送温度轨迹数据业务异常=", e2);
}
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
//publish可以执行到这里
log.info("==========deliveryComplete={}==========", iMqttDeliveryToken.isComplete());
}
}