- 启动mqtt 服务
- 创建项目,在项目中添加模块
- 添加文件夹
- 添加maven依赖
-
<dependencies> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.0</version> </dependency> </dependencies>
-
- 编写订阅程序 名字没起好 后面有时间再调整
-
import org.eclipse.paho.client.mqttv3.IMqttClient; import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class EngineTemperatureSensor implements Callable<Void> { // ... private members omitted IMqttClient client; public static final String TOPIC = "testTopic1/003"; public EngineTemperatureSensor(IMqttClient client) { this.client = client; } @Override public Void call() throws Exception { if ( !client.isConnected()) { return null; } CountDownLatch receivedSignal = new CountDownLatch(10); client.subscribe("testTopic1/003", (topic, msg) -> { byte[] payload = msg.getPayload(); // ... payload handling omitted //print out the message System.out.println("Received message: " + new String(payload)); receivedSignal.countDown(); }); receivedSignal.await(1, TimeUnit.MINUTES); //print out the message System.out.println("Published message:2222222222222 " ); return null; } }
-
订阅:
-
import org.eclipse.paho.client.mqttv3.IMqttClient; import org.eclipse.paho.client.mqttv3.MqttMessage; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class EngineTemperatureSensor implements Callable<Void> { // ... private members omitted IMqttClient client; public static final String TOPIC = "testTopic1/003"; public EngineTemperatureSensor(IMqttClient client) { this.client = client; } @Override public Void call() throws Exception { if ( !client.isConnected()) { return null; } CountDownLatch receivedSignal = new CountDownLatch(10); client.subscribe("testTopic1/003", (topic, msg) -> { byte[] payload = msg.getPayload(); // ... payload handling omitted //print out the message System.out.println("Received message: " + new String(payload)); receivedSignal.countDown(); }); receivedSignal.await(1, TimeUnit.MINUTES); //print out the message System.out.println("Published message:2222222222222 " ); return null; } }
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
public class c5M {
//main5
public static void main(String[] args) {
System.out.println("Hello World");
String publisherId = UUID.randomUUID().toString();
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
IMqttClient subscriber = new MqttClient("tcp://127.0.0.1:1883", publisherId);
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(10);
subscriber.connect(options);
// 调用EngineTemperatureSensor
EngineTemperatureSensor sensor = new EngineTemperatureSensor(subscriber);
executor.submit(sensor); // 提交任务,但不阻塞主线程
// 这里可以添加代码来等待用户输入或者其他信号来安全地关闭程序
// 例如,你可以使用System.in.read()来等待用户输入
System.out.println("Press Enter to exit...");
new Scanner(System.in).nextLine(); // 等待用户输入
} catch (Exception e) {
//print e message
//print seperator line
System.out.println("))))))))))))))))))))))))");
System.out.println(e.getMessage());
throw new RuntimeException(e);
} finally {
// 确保最后关闭ExecutorService和MQTT客户端
executor.shutdown(); // 提交的任务将不再被接受
try {
// 等待任务完成(可选,取决于你是否需要确保所有任务都完成)
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow(); // 取消正在执行的任务
}
} catch (InterruptedException ie) {
executor.shutdownNow(); // 当前线程被中断,需要关闭ExecutorService
Thread.currentThread().interrupt(); // 保留中断状态
}
// 关闭MQTT客户端(如果有必要的话)
// 注意:这里可能需要额外的逻辑来处理MQTT客户端的关闭,具体取决于你的实现
}
}
}
发布代码:
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class EngineTemperatureSensor implements Callable<Void> {
// ... private members omitted
IMqttClient client;
public static final String TOPIC = "testTopic1/003";
public EngineTemperatureSensor(IMqttClient client) {
this.client = client;
}
@Override
public Void call() throws Exception {
if ( !client.isConnected()) {
return null;
}
Random rnd = null;
//double temp = 80 + rnd.nextDouble() * 20.0;
double temp = 10 + 1.1 * 20.0;
byte[] payload = String.format("T:%04.2f",temp)
.getBytes();
MqttMessage msg2= new MqttMessage(payload);
msg2.setQos(0);
msg2.setRetained(true);
client.publish(TOPIC,msg2);
//print out the message
System.out.println("Published message: " + msg2);
return null;
}
}
import org.eclipse.paho.client.mqttv3.IMqttClient;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class mainc3 {
// Main method
public static void main(String[] args) {
System.out.println("Hello World");
String publisherId = UUID.randomUUID().toString();
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
IMqttClient publisher = new MqttClient("tcp://127.0.0.1:1883", publisherId);
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(10);
publisher.connect(options);
// 调用EngineTemperatureSensor
EngineTemperatureSensor sensor = new EngineTemperatureSensor(publisher);
executor.submit(sensor); // 提交任务,但不阻塞主线程
// 这里可以添加代码来等待用户输入或者其他信号来安全地关闭程序
// 例如,你可以使用System.in.read()来等待用户输入
System.out.println("Press Enter to exit...");
new Scanner(System.in).nextLine(); // 等待用户输入
} catch (Exception e) {
//print e message
//print seperator line
System.out.println("))))))))))))))))))))))))");
System.out.println(e.getMessage());
throw new RuntimeException(e);
} finally {
// 确保最后关闭ExecutorService和MQTT客户端
executor.shutdown(); // 提交的任务将不再被接受
try {
// 等待任务完成(可选,取决于你是否需要确保所有任务都完成)
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow(); // 取消正在执行的任务
}
} catch (InterruptedException ie) {
executor.shutdownNow(); // 当前线程被中断,需要关闭ExecutorService
Thread.currentThread().interrupt(); // 保留中断状态
}
// 关闭MQTT客户端(如果有必要的话)
// 注意:这里可能需要额外的逻辑来处理MQTT客户端的关闭,具体取决于你的实现
}
}
}