使用java +paho mqtt编写模拟发布温度及订阅的过程

  • 启动mqtt 服务
  •  创建项目,在项目中添加模块
  •  
  •  
  • 添加文件夹
    • 添加maven依赖
  •     <dependencies>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.2.0</version>
            </dependency>
    
    
        </dependencies>
    • 编写订阅程序  名字没起好 后面有时间再调整
  • import org.eclipse.paho.client.mqttv3.IMqttClient;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    
    import java.util.Random;
    import java.util.concurrent.Callable;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    public class EngineTemperatureSensor implements Callable<Void> {
    
        // ... private members omitted
        IMqttClient client;
        public static final String TOPIC = "testTopic1/003";
    
        public EngineTemperatureSensor(IMqttClient client) {
            this.client = client;
        }
    
        @Override
        public Void call() throws Exception {
            if ( !client.isConnected()) {
                return null;
            }
            CountDownLatch receivedSignal = new CountDownLatch(10);
            client.subscribe("testTopic1/003", (topic, msg) -> {
                byte[] payload = msg.getPayload();
                // ... payload handling omitted
                //print out the message
                System.out.println("Received message: " + new String(payload));
                receivedSignal.countDown();
            });
            receivedSignal.await(1, TimeUnit.MINUTES);
    
            //print out the message
            System.out.println("Published message:2222222222222 " );
    
    
    
            return null;
        }
    
    }
  • 订阅:

  • import org.eclipse.paho.client.mqttv3.IMqttClient;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    
    import java.util.Random;
    import java.util.concurrent.Callable;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    public class EngineTemperatureSensor implements Callable<Void> {
    
        // ... private members omitted
        IMqttClient client;
        public static final String TOPIC = "testTopic1/003";
    
        public EngineTemperatureSensor(IMqttClient client) {
            this.client = client;
        }
    
        @Override
        public Void call() throws Exception {
            if ( !client.isConnected()) {
                return null;
            }
            CountDownLatch receivedSignal = new CountDownLatch(10);
            client.subscribe("testTopic1/003", (topic, msg) -> {
                byte[] payload = msg.getPayload();
                // ... payload handling omitted
                //print out the message
                System.out.println("Received message: " + new String(payload));
                receivedSignal.countDown();
            });
            receivedSignal.await(1, TimeUnit.MINUTES);
    
            //print out the message
            System.out.println("Published message:2222222222222 " );
    
    
    
            return null;
        }
    
    }

import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

public class c5M {

    //main5
    public static void main(String[] args) {
        System.out.println("Hello World");

        String publisherId = UUID.randomUUID().toString();
        ExecutorService executor = Executors.newSingleThreadExecutor();

        try {
            IMqttClient subscriber = new MqttClient("tcp://127.0.0.1:1883", publisherId);

            MqttConnectOptions options = new MqttConnectOptions();
            options.setAutomaticReconnect(true);
            options.setCleanSession(true);
            options.setConnectionTimeout(10);
            subscriber.connect(options);

            // 调用EngineTemperatureSensor
            EngineTemperatureSensor sensor = new EngineTemperatureSensor(subscriber);
            executor.submit(sensor); // 提交任务,但不阻塞主线程



            // 这里可以添加代码来等待用户输入或者其他信号来安全地关闭程序
            // 例如,你可以使用System.in.read()来等待用户输入
            System.out.println("Press Enter to exit...");
            new Scanner(System.in).nextLine(); // 等待用户输入

        } catch (Exception e) {
            //print e message
            //print seperator line
            System.out.println("))))))))))))))))))))))))");

            System.out.println(e.getMessage());
            throw new RuntimeException(e);

        } finally {
            // 确保最后关闭ExecutorService和MQTT客户端
            executor.shutdown(); // 提交的任务将不再被接受
            try {
                // 等待任务完成(可选,取决于你是否需要确保所有任务都完成)
                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                    executor.shutdownNow(); // 取消正在执行的任务
                }
            } catch (InterruptedException ie) {
                executor.shutdownNow(); // 当前线程被中断,需要关闭ExecutorService
                Thread.currentThread().interrupt(); // 保留中断状态
            }
            // 关闭MQTT客户端(如果有必要的话)
            // 注意:这里可能需要额外的逻辑来处理MQTT客户端的关闭,具体取决于你的实现
        }

    }

}

发布代码:

import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class EngineTemperatureSensor implements Callable<Void> {

    // ... private members omitted
    IMqttClient client;
    public static final String TOPIC = "testTopic1/003";

    public EngineTemperatureSensor(IMqttClient client) {
        this.client = client;
    }

    @Override
    public Void call() throws Exception {
        if ( !client.isConnected()) {
            return null;
        }
        Random rnd = null;
        //double temp =  80 + rnd.nextDouble() * 20.0;
        double temp =  10 + 1.1 * 20.0;
        byte[] payload = String.format("T:%04.2f",temp)
                .getBytes();
        MqttMessage msg2= new MqttMessage(payload);

        msg2.setQos(0);
        msg2.setRetained(true);
        client.publish(TOPIC,msg2);

        //print out the message
        System.out.println("Published message: " + msg2);



        return null;
    }

}

 

import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class mainc3 {

    // Main method
    public static void main(String[] args) {
        System.out.println("Hello World");

        String publisherId = UUID.randomUUID().toString();
        ExecutorService executor = Executors.newSingleThreadExecutor();

        try {
            IMqttClient publisher = new MqttClient("tcp://127.0.0.1:1883", publisherId);

            MqttConnectOptions options = new MqttConnectOptions();
            options.setAutomaticReconnect(true);
            options.setCleanSession(true);
            options.setConnectionTimeout(10);
            publisher.connect(options);

            // 调用EngineTemperatureSensor
            EngineTemperatureSensor sensor = new EngineTemperatureSensor(publisher);
            executor.submit(sensor); // 提交任务,但不阻塞主线程



            // 这里可以添加代码来等待用户输入或者其他信号来安全地关闭程序
            // 例如,你可以使用System.in.read()来等待用户输入
            System.out.println("Press Enter to exit...");
            new Scanner(System.in).nextLine(); // 等待用户输入

        } catch (Exception e) {
            //print e message
            //print seperator line
            System.out.println("))))))))))))))))))))))))");

            System.out.println(e.getMessage());
            throw new RuntimeException(e);

        } finally {
            // 确保最后关闭ExecutorService和MQTT客户端
            executor.shutdown(); // 提交的任务将不再被接受
            try {
                // 等待任务完成(可选,取决于你是否需要确保所有任务都完成)
                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                    executor.shutdownNow(); // 取消正在执行的任务
                }
            } catch (InterruptedException ie) {
                executor.shutdownNow(); // 当前线程被中断,需要关闭ExecutorService
                Thread.currentThread().interrupt(); // 保留中断状态
            }
            // 关闭MQTT客户端(如果有必要的话)
            // 注意:这里可能需要额外的逻辑来处理MQTT客户端的关闭,具体取决于你的实现
        }





    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/734359.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

win11家庭版使用自带的Linux子系统并安装docker【全网整合版】

1. 电脑配置项 开发者选项 设置->系统->开发者选项 启用或关闭windows功能 控制面板->卸载程序->启用或关闭windows功能 勾选适用于Linux的Windows子系统和虚拟机平台并重启电脑(首次勾选选项时电脑提示重启) Hyper-V 关键点&#xff1a;win11家庭版在启动…

JAVA同城服务场馆门店预约系统支持H5小程序APP源码

&#x1f4f1;一键预约&#xff0c;畅享无忧体验&#x1f3e2; &#x1f680;一、开启预约新纪元 在繁忙的都市生活中&#xff0c;我们常常因为时间紧张而错过心仪的门店或场馆服务。然而&#xff0c;有了“门店场馆预约小程序”&#xff0c;这些问题都将迎刃而解。这款小程序…

vue3学习教程第四十节(pinia的用法注意事项解构store)

pinia 主要包括以下五部分&#xff0c;经常用到的是 store、state、getters、actions 以下使用说明&#xff0c;注意事项&#xff0c;仅限于 vue3 setup 语法糖中使用&#xff0c;若使用选项式 API 请直接查看官方文档&#xff1a; 一、前言&#xff1a; pinia 是为了探索 vu…

Python3简单实现与Java的Hutool库SM2的加解密互通

1、背景&#xff1a; 因业务需求&#xff0c;需要与某平台接口对接。平台是Java基于Hutool库实现的SM2加密解密&#xff0c;研究了下SM2的加解密算法&#xff0c;网上找的资料&#xff0c;都是说SM2【椭圆曲线】 公钥长【x,y分量 64字节】&#xff0c;私钥短【32字节】&#x…

网络富集显著性检验NEST(?)

https://doi.org/10.1002/hbm.26714 背景 一般情况下&#xff0c;研究者通过评估统计量较大的脑区与功能网络重叠的情况&#xff0c;或者计算网络的体素占比&#xff0c;来确定行为和功能网络的相关性。NEST能检测行为表型和大脑表型的相关性是否富集在特定的功能网络中。例如下…

Golang | Leetcode Golang题解之第166题分数到小数

题目&#xff1a; 题解&#xff1a; func fractionToDecimal(numerator, denominator int) string {if numerator%denominator 0 {return strconv.Itoa(numerator / denominator)}s : []byte{}if numerator < 0 ! (denominator < 0) {s append(s, -)}// 整数部分numer…

食谱API

在当今追求健康与美味完美结合的时代&#xff0c;获取准确而丰富的食品和营养信息变得至关重要。无论是热衷于探索世界各地美食的烹饪爱好者&#xff0c;还是对自身饮食营养严格把控的健康追求者&#xff0c;都离不开可靠的资源。幸运的是&#xff0c;现在有诸如 TheMealDB 和 …

win10环境配置ollama-ui运行llama3模型

先说我的笔记本电脑配置intel-i7-11390h,4核8处理器&#xff0c;内存16G。显卡NVIDA GeFroce MX450&#xff0c;2G显存&#xff0c;这是一台5000元左右的电脑。 我用它跑roop、sd1.5、ffusion2、ChatTTs还有pythonpytorch的自定义模型&#xff0c;现在用来跑llama3。当然&…

新手(初学者)学R语言第一课,从学正确导入数据开始

初看题目好像我在教你怎么导入数据&#xff0c;不不不&#xff0c;我是在教你正确的导入数据&#xff0c;不是说数据导入R就叫正确导入数据了。本章为新手教程&#xff0c;老手可以跳过。 这个内容早就想写了&#xff0c;今天有点空和大家聊一下。为什么R语言对于新手而言不太友…

【Autoware】Autoware.universe安装过程与问题记录

&#x1f60f;★,:.☆(&#xffe3;▽&#xffe3;)/$:.★ &#x1f60f; 这篇文章主要介绍Autoware.universe安装过程与问题记录。 无专精则不能成&#xff0c;无涉猎则不能通。——梁启超 欢迎来到我的博客&#xff0c;一起学习&#xff0c;共同进步。 喜欢的朋友可以关注一下…

基于MYSQL的JAVA初级优化措施

世界是草台班子,这句话视乎很流行! 经历过几家创业公司的项目优化,以及大公司项目. 很多优化非常初级,用心点都能自己找出来! 其实主要原因当初是为了赶进度,能省则省.什么设计啊? 什么性能压测啊. 都省掉吧! 质量都要靠测试人员帮忙找出来,更何况是性能问题呢! 那怕是配齐了…

临时关闭Windows安全中心

在使用WindowsOS是&#xff0c;微软安全中心是我们必不可少的安全防护&#xff0c;但有时我们也会产生想要将其关闭的需求&#xff0c;下面将要介绍如何临时关闭Windows的安全中心 一、打开安全中心、选择“病毒与威胁防护”&#xff0c;点击“管理设置” 之后将其实时保护关闭…

Vue3中的常见组件通信(超详细版)

Vue3中的常见组件通信 概述 ​ 在vue3中常见的组件通信有props、mitt、v-model、 r e f s 、 refs、 refs、parent、provide、inject、pinia、slot等。不同的组件关系用不同的传递方式。常见的撘配形式如下表所示。 组件关系传递方式父传子1. props2. v-model3. $refs4. 默认…

M12单端I/O预铸法兰插座A-code

M12单端I/O预铸法兰插座A-code概述 M12单端I/O预铸连接器A-code是一种常用于工业自动化领域的连接器件&#xff0c;主要用于传感器和执行器之间的信号传输。它的设计遵循国际标准IEC 61076-2-101&#xff0c;具有良好的防水防尘性能&#xff0c;通常达到IP67的保护等级。M12连…

自学鸿蒙HarmonyOS的ArkTS语言<一>基本语法

一、一个ArkTs的目录结构 二、一个页面的结构 A、装饰器 Entry 装饰器 : 标记组件为入口组件&#xff0c;一个页面由多个自定义组件组成&#xff0c;但是只能有一个组件被标记 Component : 自定义组件, 仅能装饰struct关键字声明的数据结构 State&#xff1a;组件中的状态变量…

【Linux硬盘读取】Windows下读取Linux系统的文件解决方案:Linux Reader4.5 By DiskInternals

前言 相信做机器视觉相关的很多人都会安装 Windows 和 Linux 双系统。在 Linux 下&#xff0c;我们可以很方便的访问Windows的磁盘&#xff0c;反过来却不行。但是这又是必须的。通过亲身体验&#xff0c;向大家推荐这么一个工具&#xff0c;可以让 Windows 方便的访问 Ext 2/3…

机器学习课程复习——逻辑回归

1. 激活函数 Q:激活函数有哪些? SigmoidS型函数Tanh 双曲正切函数

SpringBoot+Maven项目的配置构建

文章目录 1、application.properties2、pom.xml 1、application.properties 也可使用yml yaml #静态资源 spring.mvc.static-path-pattern/images/** #上传文件大小设置 spring.http.multipart.max-file-size10MB spring.http.multipart.max-request-size10MBspring.mvc.path…

50万定律:任何单位和任何职业,只要工资年收入大于50万,基本上都要牺牲个人生活,无论是医生还是教师...

“我今年30岁&#xff0c;在北京&#xff0c;年薪50万&#xff0c;但我一点也不快乐……” 朋友圈看到朋友的感慨&#xff0c;配图是深夜加班的CBD夜景&#xff0c;评论区不出所料&#xff0c;一半是羡慕&#xff0c;一半是“凡尔赛”。 年薪50万&#xff0c;在很多人眼里&am…

Spring的启动扩展点机制详解

在Java的世界中&#xff0c;我们知道Spring是当下最主流的开发框架&#xff0c;没有之一。而在使用Dubbo、Mybatis等开源框架时&#xff0c;我们发现可以采用和Spring完全一样的使用方式来使用它们。 可能你在平时的使用过程中并没有意识到这一点&#xff0c;但仔细想一想&…