EdgeX Foundry - 连接 MQTT 设备

文章目录

    • 一、概述
      • 1.安装说明
      • 2.MQTT 设备模拟器
        • 2.1.模拟器设计
        • 2.2.Spring Boot 程序源码
          • 2.2.1.MQTT
          • 2.2.2.JsonUtils
          • 2.2.3.Device
        • 2.3.程序配置
    • 二、连接 MQTT 设备
      • 1.docker-comepse
      • 2.设备配置文件
      • 3.启动 EdgeX Foundry
      • 4.访问 UI
        • 4.1. consul
        • 4.2. EdgeX Console
      • 5.创建 MQTT 设备
        • 5.1.创建设备配置文件
        • 5.2.添加设备
      • 6.运行模拟器
      • 7.测试
        • 7.1.命令
        • 7.2.事件
        • 7.3.读值

  • EdgeX Foundry
# EdgeX Foundry

https://iothub.org.cn/docs/edgex/
https://iothub.org.cn/docs/edgex/device/link-mqtt/

一、概述

1.安装说明

在这里插入图片描述

# 官方文档

https://docs.edgexfoundry.org/3.1/microservices/device/services/device-mqtt/Ch-ExamplesAddingMQTTDevice/

安装方式:

  • 使用 EdgeX Console 界面创建 MQTT 设备
  • 使用 Spring Boot 实现 MQTT 设备模拟器

2.MQTT 设备模拟器

2.1.模拟器设计

MQTT 设备模拟器使用 Spring Boot 开发。参考 mock-device.js。

mock-device.js

function getRandomFloat(min, max) {
    return Math.random() * (max - min) + min;
}

const deviceName = "my-custom-device";
let message = "test-message";
let json = {"name" : "My JSON"};

// DataSender sends async value to MQTT broker every 15 seconds
schedule('*/15 * * * * *', ()=>{
    var data = {};
    data.randnum = getRandomFloat(25,29).toFixed(1);
    data.ping = "pong"
    data.message = "Hello World"

    publish( 'incoming/data/my-custom-device/values', JSON.stringify(data));
});

// CommandHandler receives commands and sends response to MQTT broker
// 1. Receive the reading request, then return the response
// 2. Receive the set request, then change the device value
subscribe( "command/my-custom-device/#" , (topic, val) => {
    const words = topic.split('/');
    var cmd = words[2];
    var method = words[3];
    var uuid = words[4];
    var response = {};
    var data = val;

    if (method == "set") {
        switch(cmd) {
            case "message":
                message = data[cmd];
                break;
            case "json":
                json = data[cmd];
                break;
        }
    }else{
        switch(cmd) {
            case "ping":
                response.ping = "pong";
                break;
            case "message":
                response.message = message;
                break;
            case "randnum":
                response.randnum = 12.123;
                break;
            case "json":
                response.json = json;
                break;
        }
    }
    var sendTopic ="command/response/"+ uuid;
    publish( sendTopic, JSON.stringify(response));
});
2.2.Spring Boot 程序源码
2.2.1.MQTT
package com.iothub.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

/**
 * Created by 传智播客*黑马程序员.
 */
@Component
public class EmqClient {
    
    private static final Logger log = LoggerFactory.getLogger(EmqClient.class);
    
    
    private IMqttClient mqttClient;
    
    @Autowired
    private MqttProperties mqttProperties;
    
    @Autowired
    private MqttCallback mqttCallback;
    
    
    @PostConstruct
    public void init(){
        MqttClientPersistence mempersitence = new MemoryPersistence();
        try {
            mqttClient = new MqttClient(mqttProperties.getBrokerUrl(),mqttProperties.getClientId(),mempersitence);
        } catch (MqttException e) {
            log.error("初始化客户端mqttClient对象失败,errormsg={},brokerUrl={},clientId={}",e.getMessage(),mqttProperties.getBrokerUrl(),mqttProperties.getClientId());
        }

    }

    /**
     * 连接broker
     * @param username
     * @param password
     */
    public void connect(String username,String password){
        MqttConnectOptions options = new MqttConnectOptions();
        options.setAutomaticReconnect(true);
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setCleanSession(true);
        
        mqttClient.setCallback(mqttCallback);

        try {
            mqttClient.connect(options);
        } catch (MqttException e) {
            log.error("mqtt客户端连接服务端失败,失败原因{}",e.getMessage());
        }
    }

    /**
     * 断开连接
     */
    @PreDestroy
    public void disConnect(){
        try {
            mqttClient.disconnect();
        } catch (MqttException e) {
            log.error("断开连接产生异常,异常信息{}",e.getMessage());
        }
    }

    /**
     * 重连
     */
    public void reConnect(){
        try {
            mqttClient.reconnect();
        } catch (MqttException e) {
            log.error("重连失败,失败原因{}",e.getMessage());
        }
    }

    /**
     * 发布消息
     * @param topic
     * @param msg
     * @param qos
     * @param retain
     */
    public void publish(String topic, String msg, QosEnum qos, boolean retain){

        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setPayload(msg.getBytes());
        mqttMessage.setQos(qos.value());
        mqttMessage.setRetained(retain);
        try {
            mqttClient.publish(topic,mqttMessage);
        } catch (MqttException e) {
            log.error("发布消息失败,errormsg={},topic={},msg={},qos={},retain={}",e.getMessage(),topic,msg,qos.value(),retain);
        }

    }

    /**
     * 订阅
     * @param topicFilter
     * @param qos
     */
    public void subscribe(String topicFilter,QosEnum qos){
        try {
            mqttClient.subscribe(topicFilter,qos.value());
        } catch (MqttException e) {
            log.error("订阅主题失败,errormsg={},topicFilter={},qos={}",e.getMessage(),topicFilter,qos.value());
        }

    }

    /**
     * 取消订阅
     * @param topicFilter
     */
    public void unSubscribe(String topicFilter){
        try {
            mqttClient.unsubscribe(topicFilter);
        } catch (MqttException e) {
            log.error("取消订阅失败,errormsg={},topicfiler={}",e.getMessage(),topicFilter);
        }
    }
    
}
package com.iothub.mqtt;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.iothub.device.MqttData;
import com.iothub.utils.JsonUtils;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * Created by 传智播客*黑马程序员.
 */
@Component
public class MessageCallback implements MqttCallback {
    
    private static final Logger log = LoggerFactory.getLogger(MessageCallback.class);


    @Autowired
    private EmqClient emqClient;


    @Autowired
    private MqttData mqttData;



    /**
     * 丢失了对服务端的连接后触发的回调
     * @param cause
     */
    @Override
    public void connectionLost(Throwable cause) {
        // 资源的清理  重连
        log.info("丢失了对服务端的连接");
    }

    /**
     * 应用收到消息后触发的回调
     * @param topic
     * @param message
     * @throws Exception
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        log.info("订阅者订阅到了消息,topic={},messageid={},qos={},payload={}",
                topic,
                message.getId(),
                message.getQos(),
                new String(message.getPayload()));

        String[] buff = topic.split("/");
        String cmd = buff[2];
        String method = buff[3];
        String uuid = buff[4];
        String response = "{}";
        String data = new String(message.getPayload());


        if (method.equals("set")) {
            log.info("修改 message ={}", data);
            switch (cmd) {
                case "message":
                    String msg= JsonUtils.jsonToNodeString( data, "message");
                    mqttData.setMessage(msg);
                    break;
                case "json":
                    String json= JsonUtils.jsonToNodeString( data, "json");
                    mqttData.setJson(json);
                    break;
            }
        } else {
            switch (cmd) {
                case "ping":
                    response = mqttData.getPing();
                    break;
                case "message":
                    response = mqttData.getMessage();
                    break;
                case "randnum":
                    response = mqttData.getRandnum();
                    break;
                case "json":
                    response = mqttData.getJson();
                    break;
            }


        }

        emqClient.publish("command/response/" + uuid, response, QosEnum.QoS1,false);
    }

    /**
     * 消息发布者消息发布完成产生的回调
     * @param token
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        int messageId = token.getMessageId();
        String[] topics = token.getTopics();
        log.info("消息发布完成,messageid={},topics={}",messageId,topics);
    }
}
package com.iothub.mqtt;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

/**
 * Created by 传智播客*黑马程序员.
 */
@Configuration
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
    
    private String brokerUrl;
    
    private String clientId;
    
    private String username;
    
    private String password;


    public String getBrokerUrl() {
        return brokerUrl;
    }

    public void setBrokerUrl(String brokerUrl) {
        this.brokerUrl = brokerUrl;
    }

    public String getClientId() {
        return clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    @Override
    public String toString() {
        return "MqttProperties{" +
                "brokerUrl='" + brokerUrl + '\'' +
                ", clientId='" + clientId + '\'' +
                ", username='" + username + '\'' +
                ", password='" + password + '\'' +
                '}';
    }
}
package com.iothub.mqtt;

/**
 * Created by 传智播客*黑马程序员.
 */
public enum QosEnum {
    QoS0(0),QoS1(1),QoS2(2);


    private final int value;

    QosEnum(int value) {
        this.value = value;
    }
    
    public int value(){
        return this.value;
    }
}
2.2.2.JsonUtils
package com.iothub.utils;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.commons.lang3.StringUtils;

import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Map;

/**
 * JSON转换工具类。
 *
 * @date 2017-06-29 10:07:51
 */
public final class JsonUtils {
	/**
	 * 日期字符串的格式
	 */
	public static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";

	private static ObjectMapper objectMapper = new ObjectMapper();

	static {
		// 设置日期字符串的格式
		objectMapper.setDateFormat(new SimpleDateFormat(DATE_FORMAT));

		// 反序列化时,允许字段名没有双引号
		objectMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);

		// 忽略未知属性
		objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

		// 反序列化时,忽略未知的属性
		objectMapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
	}

	private JsonUtils() {
	}

	/**
	 * 获取ObjectMapper
	 *
	 * @return
	 */
	public static ObjectMapper getObjectMapper() {
		return objectMapper;
	}

	/**
	 * 对象转JSON字符串
	 *
	 * @param obj 对象
	 * @return JSON字符串
	 */
	public static String toJSONString(Object obj) {
		try {
			return objectMapper.writeValueAsString(obj);
		}
		catch (JsonProcessingException e) {
			throw new RuntimeException("JSON序列化时出现错误!", e);
		}
	}

	/**
	 * 对象转JSON字符串。
	 *
	 * @param obj        对象。
	 * @param dateFormat 日期格式。
	 * @return JSON字符串。
	 */
	public static String toJSONString(Object obj, String dateFormat) {
		if (StringUtils.isBlank(dateFormat)) {
			dateFormat = DATE_FORMAT;
		}

		objectMapper.setDateFormat(new SimpleDateFormat(dateFormat));
		try {
			return objectMapper.writeValueAsString(obj);
		}
		catch (JsonProcessingException e) {
			throw new RuntimeException("JSON序列化时出现错误!", e);
		}
	}

	/**
	 * 根据节点解析json中对象
	 * @param json json
	 * @param key 对象key
	 * @param clazz clazz
	 * @return json中对象
	 */
	public static <T> T jsonToNodeObject(String json,String key,Class<T> clazz){
		try {
			JsonNode jsonNode = objectMapper.readTree(json);
			return jsonNode.get(key).traverse(objectMapper).readValueAs(clazz);
		} catch (Exception e) {
			throw new RuntimeException("json解析节点错误");
		}
	}

	/**
	 * 根据节点解析json中嵌套对象
	 * @param json json
	 * @param key 对象key
	 * @param clazz clazz
	 * @param nodeNames 节点名
	 * @return json中对象
	 */
	public static <T> T jsonToNodeObject(String json,String key,Class<T> clazz,String... nodeNames){
		try {
			JsonNode jsonNode = objectMapper.readTree(json);
			for (String nodeName : nodeNames) {
				jsonNode = jsonNode.path(nodeName);
			}
			return jsonNode.get(key).traverse(objectMapper).readValueAs(clazz);
		} catch (Exception e) {
			throw new RuntimeException("json解析节点错误",e);
		}
	}


	public static String jsonToNodeString(String json,String key){
		try {
			JsonNode jsonNode = objectMapper.readTree(json);
			return jsonNode.get(key).asText();
		} catch (Exception e) {
			throw new RuntimeException("json解析节点错误",e);
		}
	}

	public static String jsonToNodeString(String json,String key,String nodeName){
		try {
			JsonNode jsonNode = objectMapper.readTree(json);
			jsonNode = jsonNode.path(nodeName);
			return jsonNode.get(key).toString();
		} catch (Exception e) {
			throw new RuntimeException("json解析节点错误",e);
		}
	}


	public static String jsonToNodeString(String json,String key,String... nodeName){
		try {
			JsonNode jsonNode = objectMapper.readTree(json);
			for (String name : nodeName) {
				jsonNode = jsonNode.path(name);
			}
			return jsonNode.get(key).asText();
		} catch (Exception e) {
			throw new RuntimeException("json解析节点错误",e);
		}
	}

	public static JsonNode jsonToNode(String json,String key,String... nodeName){
		try {
			JsonNode jsonNode = objectMapper.readTree(json);
			for (String name : nodeName) {
				jsonNode = jsonNode.path(name);
			}
			return jsonNode.get(key);
		} catch (Exception e) {
			throw new RuntimeException("json解析节点错误",e);
		}
	}

	public static ArrayNode jsonToArrayString(String json, String key, String... nodeName){
		try {
			JsonNode jsonNode = objectMapper.readTree(json);
			for (String name : nodeName) {
				jsonNode = jsonNode.path(name);
			}
			return (ArrayNode) jsonNode.get(key);
		} catch (Exception e) {
			throw new RuntimeException("json解析节点错误",e);
		}
	}



	/**
	 * 根据节点
	 * @param json
	 * @param key
	 * @param clazz
	 * @param <T>
	 * @return
	 */
	public static <T> T jsonToValue(String json,String key,Class<T> clazz){
		try {
			Map map = JsonUtils.jsonToMap(json);
			return (T)map.get(key);
		} catch (Exception e) {
			throw new RuntimeException("json解析节点错误",e);
		}
	}

	/**
	 * JSON字符串转对象。
	 *
	 * @param json  JSON字符串。
	 * @param clazz 对象类型。
	 * @param <T>   类型参数。
	 * @return 指定类型的对象。
	 */
	public static <T> T jsonToObject(String json, Class<T> clazz) {
		if (StringUtils.isBlank(json)) {
			throw new IllegalArgumentException("参数json不能为空!");
		}
		if (clazz == null) {
			throw new IllegalArgumentException("参数clazz不能为空!");
		}

		try {
			return objectMapper.readValue(json.getBytes(StandardCharsets.UTF_8), clazz);
		}
		catch (Exception e) {
			throw new RuntimeException("JSON字符串转对象时出现错误!", e);
		}
	}

	/**
	 * MAP转对象。
	 *
	 * @param map  MAP。
	 * @param clazz 对象类型。
	 * @param <T>   类型参数。
	 * @return 指定类型的对象。
	 */
	public static <T> T mapToObject(Map map, Class<T> clazz) {
		if (map == null) {
			throw new IllegalArgumentException("参数map不能为空!");
		}
		if (clazz == null) {
			throw new IllegalArgumentException("参数clazz不能为空!");
		}

		try {
			return objectMapper.convertValue(map, clazz);
		}
		catch (Exception e) {
			throw new RuntimeException("MAP字符串转对象时出现错误!", e);
		}
	}

	/**
	 * JSON字符串转对象数组。
	 *
	 * @param json  JSON字符串。
	 * @param clazz 数组元素的对象类型。
	 * @param <T>   类型参数。
	 * @return 指定类型的对象数组。
	 */
	public static <T> List<T> jsonToList(String json, Class<T> clazz) {
		if (StringUtils.isBlank(json)) {
			throw new IllegalArgumentException("参数json不能为空!");
		}
		if (clazz == null) {
			throw new IllegalArgumentException("参数clazz不能为空!");
		}

		try {
			JavaType javaType = objectMapper.getTypeFactory().constructParametricType(List.class, clazz);
			return objectMapper.readValue(json.getBytes(StandardCharsets.UTF_8), javaType);
		}
		catch (Exception e) {
			throw new RuntimeException("JSON字符串转数组时出现错误!", e);
		}
	}

	/**
	 * JSON字符串转MAP。
	 *
	 * @param json JSON字符串。
	 * @return MAP对象。
	 */
	public static Map jsonToMap(String json) {
		if (StringUtils.isBlank(json)) {
			throw new IllegalArgumentException("参数json不能为空!");
		}

		try {
			return objectMapper.readValue(json.getBytes(StandardCharsets.UTF_8), Map.class);
		}
		catch (Exception e) {
			throw new RuntimeException("JSON字符串转对象时出现错误!", e);
		}
	}


	/**
	 * 拷贝对象属性,并返回指定类型的新对象。
	 *
	 * @param src   源对象。
	 * @param clazz 目标对象类型。
	 * @param <T>   目标对象类型参数。
	 * @return 与原对象属性值相同的新对象。
	 */
	public static <T> T copyProperties(Object src, Class<T> clazz) {
		return jsonToObject(toJSONString(src), clazz);
	}

	/**
	 * 拷贝对象属性,并返回同类型的新对象。
	 *
	 * @param src 源对象。
	 * @param <T> 目标对象类型参数。
	 * @return 与原对象属性值相同的新对象。
	 */
	public static <T> T copyProperties(T src) {

		return (T) copyProperties(src, src.getClass());
	}
	/**
	 * @Description map转JSON
	 * @Date 16:32 2020/7/6
	 * @param map map对象
	 * @return JSON字符串
	 **/
	public static String mapToJson(Map map) {
		if (map == null) {
			throw new IllegalArgumentException("参数map不能为空!");
		}
		try {
			return objectMapper.writeValueAsString(map);
		} catch (JsonProcessingException e) {
			throw new RuntimeException("Map转JSON时出现错误!", e);
		}
	}
}
2.2.3.Device
package com.iothub.device;

import com.iothub.mqtt.EmqClient;
import com.iothub.mqtt.MqttProperties;
import com.iothub.mqtt.QosEnum;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

@Component
public class MqttDevice {

    @Autowired
    private EmqClient emqClient;

    @Autowired
    private MqttData mqttData;

    @Autowired
    private MqttProperties properties;

    @PostConstruct
    public void init(){
        //连接服务端
        emqClient.connect(properties.getUsername(),properties.getPassword());
        //订阅一个主题
        emqClient.subscribe("command/my-custom-device/#", QosEnum.QoS1);
    }


    @Scheduled(fixedRate = 50000)
    public void publish(){

        String data = getData(1);

        emqClient.publish("incoming/data/my-custom-device/values",data,
                QosEnum.QoS1,false);
    }


    private String getData(Integer type){

        if (type == 1) {
            // 携带时间戳
            String data = mqttData.getValues();
            return data;

        } else if (type == 2) {
            // 不携带时间戳
            String data = "";
            return data;
        }else {
            // 数组
            String data = "[{\"key1\":\"value1\"}, {\"key2\":true}]";
            return data;
        }
    }
}

package com.iothub.device;
import com.iothub.utils.JsonUtils;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;

@Component
public class MqttData {

    public MqttData(){
        ping = "pong ......";
        randnum = 1000;
        message = "Hello World !!!";
        //json = "{\"name\" : \"My JSON\"}";
        Map<Object,Object> map = new HashMap<>();
        map.put("name", "My JSON ......");
        json = JsonUtils.toJSONString(map);
    }

    public void setRandnum(float randnum) {
        this.randnum = randnum;
    }

    public void setPing(String ping) {
        this.ping = ping;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public void setJson(String json) {
        this.json = json;
    }

    public String getRandnum() {
        Map<Object,Object> map = new HashMap<>();
        map.put("randnum", randnum );
        return JsonUtils.toJSONString(map);
    }

    public String getPing() {
        Map<Object,Object> map = new HashMap<>();
        map.put("ping", ping);
        return JsonUtils.toJSONString(map);
    }

    public String getMessage() {
        Map<Object,Object> map = new HashMap<>();
        map.put("message", message);
        return JsonUtils.toJSONString(map);
    }

    public String getJson() {
        Map<Object,Object> map = new HashMap<>();
        map.put("json", json);
        return JsonUtils.toJSONString(map);
    }

    public String getValues() {
        Map<Object,Object> map = new HashMap<>();
        map.put("randnum", randnum );
        map.put("ping", ping);
        map.put("message", message);
        return JsonUtils.toJSONString(map);
    }

    private float randnum;

    private String ping;

    private String message;

    private String json;
}
2.3.程序配置
  • pom.xml
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.4</version>
        </dependency>
  • application.yaml
server:
  port: 8888
spring:
  application:
    name: device-mqtt-simulator

mqtt:
  broker-url: tcp://192.168.202.233:1883
  client-id: device-mqtt-simulator
  username:
  password:

二、连接 MQTT 设备

1.docker-comepse

# 1.克隆 edgex-compose
$ git clone git@github.com:edgexfoundry/edgex-compose.git 
$ git clone https://github.com/edgexfoundry/edgex-compose.git
$ cd edgex-compose 
$ git checkout v3.1


# 2.生成 docker-compose.yml 文件(注意这包括 mqtt-broker)
$ cd compose-builder
$ make gen ds-mqtt mqtt-broker no-secty


# 3.检查生成的文件
$ ls | grep 'docker-compose.yml'
docker-compose.yml
[root@edgex mqtt-device]# git clone https://github.com/edgexfoundry/edgex-compose.git
Cloning into 'edgex-compose'...
remote: Enumerating objects: 4779, done.
remote: Counting objects: 100% (2916/2916), done.
remote: Compressing objects: 100% (173/173), done.
remote: Total 4779 (delta 2831), reused 2804 (delta 2741), pack-reused 1863
Receiving objects: 100% (4779/4779), 1.22 MiB | 450.00 KiB/s, done.
Resolving deltas: 100% (4042/4042), done.


[root@edgex mqtt-device]# ll
total 4
drwxr-xr-x. 6 root root 4096 Feb  1 04:10 edgex-compose


[root@edgex mqtt-device]# cd edgex-compose/
[root@edgex edgex-compose]# git checkout v3.1
Note: checking out 'v3.1'.

You are in 'detached HEAD' state. You can look around, make experimental
changes and commit them, and you can discard any commits you make in this
state without impacting any branches by performing another checkout.

If you want to create a new branch to retain commits you create, you may
do so (now or later) by using -b with the checkout command again. Example:

  git checkout -b new_branch_name

HEAD is now at 488a3fe... Merge pull request #424 from lenny-intel/device-mqtt-secure-mode-napa


[root@edgex edgex-compose]# cd compose-builder/

[root@edgex compose-builder]# make gen ds-mqtt mqtt-broker no-secty
echo MQTT_VERBOSE=
MQTT_VERBOSE=
docker compose  -p edgex -f docker-compose-base.yml -f add-device-mqtt.yml -f add-mqtt-broker-mosquitto.yml convert > docker-compose.yml
rm -rf ./gen_ext_compose


[root@edgex compose-builder]# ls | grep 'docker-compose.yml'
docker-compose.yml

2.设备配置文件

1.设备配置文件

name: "my-custom-device-profile"
manufacturer: "iot"
model: "MQTT-DEVICE"
description: "Test device profile"
labels:
  - "mqtt"
  - "test"
deviceResources:
  -
    name: randnum
    isHidden: true
    description: "device random number"
    properties:
      valueType: "Float32"
      readWrite: "R"
  -
    name: ping
    isHidden: true
    description: "device awake"
    properties:
      valueType: "String"
      readWrite: "R"
  -
    name: message
    isHidden: false
    description: "device message"
    properties:
      valueType: "String"
      readWrite: "RW"
  -
    name: json
    isHidden: false
    description: "JSON message"
    properties:
      valueType: "Object"
      readWrite: "RW"
      mediaType: "application/json"

deviceCommands:
  -
    name: values
    readWrite: "R"
    isHidden: false
    resourceOperations:
        - { deviceResource: "randnum" }
        - { deviceResource: "ping" }
        - { deviceResource: "message" }

2.设备配置

使用此配置文件来定义设备和调度作业。device-mqtt 在启动时生成一个相对实例。

# Pre-define Devices
deviceList:
- name: "my-custom-device"
  profileName: "my-custom-device-profile"
  description: "MQTT device is created for test purpose"
  labels: [ "MQTT", "test" ]
  protocols:
    mqtt:
      CommandTopic: "command/my-custom-device"
  autoEvents:
    - interval: "30s"
      onChange: false
      sourceName: "message"

CommandTopic 用于发布GET或SET命令请求

3.启动 EdgeX Foundry

使用以下命令部署 EdgeX:

$ cd edgex-compose/compose-builder
$ docker compose pull
$ docker compose -f docker-compose.yml up -d
$ docker compose up -d


# 修改配置文件
替换IP地址 127.0.0.1 为 0.0.0.0
# docker compose pull

# docker compose up -d

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

4.访问 UI

4.1. consul
# 访问地址
http://192.168.202.233:8500

在这里插入图片描述

4.2. EdgeX Console
# 访问地址
http://192.168.202.233:4000/

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

5.创建 MQTT 设备

5.1.创建设备配置文件

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

设备配置文件

name: "my-custom-device-profile"
manufacturer: "iot"
model: "MQTT-DEVICE"
description: "Test device profile"
labels:
  - "mqtt"
  - "test"
deviceResources:
  -
    name: randnum
    isHidden: true
    description: "device random number"
    properties:
      valueType: "Float32"
      readWrite: "R"
  -
    name: ping
    isHidden: true
    description: "device awake"
    properties:
      valueType: "String"
      readWrite: "R"
  -
    name: message
    isHidden: false
    description: "device message"
    properties:
      valueType: "String"
      readWrite: "RW"
  -
    name: json
    isHidden: false
    description: "JSON message"
    properties:
      valueType: "Object"
      readWrite: "RW"
      mediaType: "application/json"

deviceCommands:
  -
    name: values
    readWrite: "R"
    isHidden: false
    resourceOperations:
        - { deviceResource: "randnum" }
        - { deviceResource: "ping" }
        - { deviceResource: "message" }
5.2.添加设备

设备配置

使用此配置文件来定义设备和调度作业。device-mqtt 在启动时生成一个相对实例。

# Pre-define Devices
deviceList:
- name: "my-custom-device"
  profileName: "my-custom-device-profile"
  description: "MQTT device is created for test purpose"
  labels: [ "MQTT", "test" ]
  protocols:
    mqtt:
      CommandTopic: "command/my-custom-device"
  autoEvents:
    - interval: "30s"
      onChange: false
      sourceName: "message"

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

6.运行模拟器

在这里插入图片描述

7.测试

7.1.命令

在这里插入图片描述

7.2.事件

在这里插入图片描述

{
	"apiVersion": "v3",
	"id": "af46944b-e7e0-4d8b-bb6d-18d42721399b",
	"deviceName": "my-custom-device",
	"profileName": "my-custom-device-profile",
	"sourceName": "message",
	"origin": 1708341080603620600,
	"readings": [{
		"id": "edb1630b-7d15-49b8-97e3-1662520f7799",
		"origin": 1708341080603617300,
		"deviceName": "my-custom-device",
		"resourceName": "message",
		"profileName": "my-custom-device-profile",
		"valueType": "String",
		"value": "1111111"
	}]
}


{
	"apiVersion": "v3",
	"id": "99809eb7-83b9-49e3-9a5b-d65adc38a522",
	"deviceName": "my-custom-device",
	"profileName": "my-custom-device-profile",
	"sourceName": "values",
	"origin": 1708341090003785700,
	"readings": [{
		"id": "668df4eb-1404-4267-b0ee-464f1296e50b",
		"origin": 1708341090003772400,
		"deviceName": "my-custom-device",
		"resourceName": "randnum",
		"profileName": "my-custom-device-profile",
		"valueType": "Float32",
		"value": "2.650000e+01"
	}, {
		"id": "65067425-d134-4fc3-922f-2337d228cf0f",
		"origin": 1708341090003773700,
		"deviceName": "my-custom-device",
		"resourceName": "ping",
		"profileName": "my-custom-device-profile",
		"valueType": "String",
		"value": "pong"
	}, {
		"id": "fffe8f12-536c-43d9-9989-9d450d3b0b7b",
		"origin": 1708341090003774200,
		"deviceName": "my-custom-device",
		"resourceName": "message",
		"profileName": "my-custom-device-profile",
		"valueType": "String",
		"value": "Hello World"
	}]
}
7.3.读值

在这里插入图片描述

{
	"id": "aa29ebc7-2d31-4a4d-b58a-a8d85ba7903e",
	"origin": 1708341330008859000,
	"deviceName": "my-custom-device",
	"resourceName": "message",
	"profileName": "my-custom-device-profile",
	"valueType": "String",
	"value": "Hello World"
}

{
	"id": "448591a3-d4d7-4188-9943-88a289f9f54a",
	"origin": 1708341345006348800,
	"deviceName": "my-custom-device",
	"resourceName": "randnum",
	"profileName": "my-custom-device-profile",
	"valueType": "Float32",
	"value": "2.530000e+01"
}

{
	"id": "4af98f08-888a-495d-a07e-4c87dc6d2b82",
	"origin": 1708341345006350000,
	"deviceName": "my-custom-device",
	"resourceName": "ping",
	"profileName": "my-custom-device-profile",
	"valueType": "String",
	"value": "pong"
}

{
	"id": "26600864-c850-4d45-bf42-835dcf966249",
	"origin": 1708341345006350600,
	"deviceName": "my-custom-device",
	"resourceName": "message",
	"profileName": "my-custom-device-profile",
	"valueType": "String",
	"value": "Hello World"
}

{
	"id": "d70a679e-3532-4f7d-a3d1-0e53c8ad5f08",
	"origin": 1708341315007203800,
	"deviceName": "my-custom-device",
	"resourceName": "message",
	"profileName": "my-custom-device-profile",
	"valueType": "String",
	"value": "Hello World"
}
  • EdgeX Foundry
# EdgeX Foundry

https://iothub.org.cn/docs/edgex/
https://iothub.org.cn/docs/edgex/device/link-mqtt/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/432703.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

国产电脑替代后,办公软件不兼容,国产机成了摆设如何解决?

随着国家信创战略的推进&#xff0c;许多党政机关、行政事业单位、央国企已经采购了大量的国产CPU和国产操作系统的办公电脑&#xff0c;然而&#xff0c;在替代Windows系统电脑的过程中&#xff0c;许多企业遇到了一个严重的问题&#xff1a;办公软件的不兼容性。这导致许多国…

dolphinescheduler调用API

&#xff08;作者&#xff1a;陈玓玏&#xff09; 1. 打开api文档 api文档地址&#xff1a;http://{api server ip}:12345/dolphinscheduler/swagger-ui/index.html?languagezh_CN&langcn&#xff0c;我是用k8s部署的&#xff0c;所以ip和端口是由service决定的&#xf…

人事档案转出需要注意哪些方面

人事档案转出是指将员工的人事档案从一个部门、公司或组织转移到另一个部门、公司或组织的过程。这个过程需要注意以下几个方面&#xff1a; 1.法律合规&#xff1a;在进行人事档案转出前&#xff0c;要确保遵守相关的法律法规和公司内部规定。例如&#xff0c;要确保有合法的授…

华为Web举例:私网用户通过三元组NAT访问Internet

Web举例&#xff1a;私网用户通过三元组NAT访问Internet 介绍私网用户通过三元组NAT访问Internet的配置举例。 组网需求 某公司在网络边界处部署了FW作为安全网关。为了使私网中10.1.1.0/24网段的用户可以正常访问Internet&#xff0c;需要在FW上配置源NAT策略。除了公网接口…

JavaScript的for循环与双重for循环,前端游戏开发

学习路线 第一阶段&#xff1a;网页制作 HTML&#xff1a;常用标签&#xff0c;锚点&#xff0c;列表标签&#xff0c;表单标签&#xff0c;表格标签&#xff0c;标签分类&#xff0c;标签语义化&#xff0c;注释&#xff0c;字符实体 CSS&#xff1a;CSS介绍&#xff0c;全局…

【SpringBoot3.x教程02】SpringBoot配置文件详解

前言&#xff1a;什么是配置文件 SpringBoot的配置文件是指用于定义和管理SpringBoot应用程序配置的文件。这些配置文件允许开发者调整和控制应用程序的行为&#xff0c;而无需改变代码。主要有两种格式的配置文件&#xff1a; 1、application.properties&#xff1a;这是一种使…

Spring Security的API Key实现SpringBoot 接口安全

Spring Security的API Key实现SpringBoot 接口安全 Spring Security 提供了各种机制来保护我们的 REST API。其中之一是 API 密钥。API 密钥是客户端在调用 API 调用时提供的令牌。 在本教程中&#xff0c;我们将讨论如何在Spring Security中实现基于API密钥的身份验证。 API…

vue2+vite+@vitejs/plugin-vue2可以使用require引用图片资源

很多文章都说vite不能用require&#xff0c;vue3vite确实是这样的&#xff0c;但今天无意间发现vue2vite中是可以使用require引用资源的 vue3搭配vite一般使用的是vitejs/plugin-vue解析vue语法&#xff0c;而vue2使用的则是另一个插件vitejs/plugin-vue2插件解析vue语法 看下…

易基因:NAR:RCMS编辑系统在特定细胞RNA位点的靶向m5C甲基化和去甲基化研究|项目文章

喜讯&#xff01;易基因表观转录组学RNA-BS技术服务见刊《核酸研究》 大家好&#xff0c;这里是专注表观组学十余年&#xff0c;领跑多组学科研服务的易基因。 2024年2月15日&#xff0c;吉林大学张涛、赵飞宇、李金泽为共同第一作者&#xff0c;吉林大学李占军、隋婷婷及赖良…

Oracle 的同义词(Synonym) 作用

Oracle 同义词(Synonym) 是数据库对象的一个别名&#xff0c;Oracle 可以为表、视图、序列、过程、函数、程序包等指定一个别名。同义词有两种类型&#xff1a; 私有同义词&#xff1a;拥有 CREATE SYNONYM 权限的用户(包括非管理员用户)即可创建私有同义词&#xff0c;创建的…

蓝桥杯嵌入式省赛模板构建——测量两路频率和占空比

结合测量一路PWM频率编程 测量占空比&#xff1a;测量高电平持续时间和周期 思路&#xff1a; ①.第一次上升沿中断&#xff0c;清零计数器&#xff0c;开始计时并改成下降沿中断 ②.下降沿中断&#xff0c;获取计数值T1&#xff0c;并改为上升沿中断 ③.第二次上升沿中断…

一个本科渣渣是怎么逆袭从咸鱼到Offer收割机的,Android开发了解这些自然无惧面试

面试题 一般Android面试分为两部分&#xff1a;Java部分和Android部分&#xff0c;下面说一下自己面试过程遇到的一些具体题目和一些相关知识点。 一 JAVA相关 点击领取完整开源项目《安卓学习笔记总结最新移动架构视频大厂安卓面试真题项目实战源码讲义》 1&#xff09;JAVA…

RabbitMQ(任务模型,交换机(广播,订阅,通配符订阅))

一.WorkQueues模型 WorkQueues(任务模式):让多个消费者绑定到一个队列&#xff0c;共同消费队列中的消息。 架构: 所需场景: 当消息处理比较耗时的时候&#xff0c;可能生产消息的速度会远远大于消息的消费速度。长此以往&#xff0c;消息就会堆积越来越多&#xff0c;无法及…

工业镜头的重要参数之视场、放大倍率、芯片尺寸--51camera

今天来简单介绍下工业镜头中常用的参数中的三个&#xff1a; 1、视场 视场&#xff08;FOV&#xff09;也称视野,是指能被视觉系统观察到的物方可视范围。 对于镜头而言&#xff0c;可观察到的视场跟镜头放大倍率及相机芯片选择有关。因此需要根据被观察物体的尺寸&#xff…

docker安装和配置minio

1. 安装镜像 docker pull minio/minio:latest上方的命令是拉取最新版本, 目前我的版本为1.29.0 2. 运行minio 客户端端口号: 29000 ,服务端端口号: 29090 docker run -p 29000:29000 -p 29090:29090 \--name minio \-d --restartalways \-e "MINIO_ACCESS_KEYminioadm…

《花书》学习:LeNet

# LeNet网络架构 正常的应该是&#xff1a;输入→操作→输出 但都简化 要么省略 操作 要么省略 输出 # LeNet第一个卷积层详解

华为OD七日集训第1期 - 按算法分类,由易到难,循序渐进,玩转OD

目录 一、适合人群二、本期训练时间三、如何参加四、七日集训第 1 期&#xff0c;极简题&#xff0c;培养刷题兴趣五、精心挑选21道高频100分经典题目&#xff0c;作为入门。第1天、逻辑分析第2天、字符串处理第3天、数组第4天、数据结构第5天、栈第6天、双指针第7天、二分查找…

Java面试题【必知必会】Spring常见面试题(2024)

近期一直在准备面试&#xff0c;所以为了巩固知识&#xff0c;也为了梳理&#xff0c;整理了一些java的基础面试题&#xff01;同时也希望各位英雄和女侠能够补充&#xff01;不胜荣幸&#xff01;&#xff01;&#xff01; 1.spring是什么&#xff1f;它的优点是什么&#xff…

灯塔:CSS笔记(1)

CSS&#xff1a;层叠样式表 所谓层叠 即叠加的意思&#xff0c;表示样式可以一层一层的层叠覆盖 css写在style标签中&#xff0c;style标签一般写在head标签里面&#xff0c;title标签下面 <!DOCTYPE html> <html lang"en"> <head><meta cha…

js设计模式:解释器模式

作用: 对文本进行解释和编译的时候,就会用到解释器模式 比如你写了一段js代码,js引擎就会去解释并执行这段代码 webpack中的各种loader就是用来解释各种文件类型的,并将其解释为js可识别的代码 示例: //翻译词库const wordList [{text:韩信前期有蓝有红,必须拿二杀。你要是…