加入MQTT库
加入库可以直接下载对应的jar包,也可以在build.gradle里导入,然后加载进入。
这里直接在build.gradle加库
dependencies {
implementation(libs.appcompat)
implementation(libs.material)
implementation(libs.activity)
implementation(libs.constraintlayout)
testImplementation(libs.junit)
androidTestImplementation(libs.ext.junit)
androidTestImplementation(libs.espresso.core)
// MQTT库
implementation("org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5")
implementation("org.eclipse.paho:org.eclipse.paho.android.service:1.1.1")
}
然后它还需要一些网络权限
在AndroidManifest.xml中加入
<!-- 获取网络权限 -->
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE"/>
<uses-permission android:name="android.permission.READ_PHONE_STATE"/>
<uses-permission android:name="android.permission.INTERNET"/>
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE"/>
<!-- 获取网络权限 -->
还要加入下面这个,啥作用不知道,别人加我也加的。
<service android:name="org.eclipse.paho.android.service.MqttService">
</service>
连接MQTT服务器
Activity 我是在Activity里面连接MQTT服务器的,然后采用回调函数的形式获取订阅信息的。
在Fragment里面连接时,总会断开重连,别问我为什么,我也不知道,我自己试出来的,不懂原理。
package com.example.myapplication;
//import android.graphics.drawable.Drawable;
import android.content.Context;
import android.os.Bundle;
import android.provider.Settings;
import android.widget.EditText;
//import android.widget.Toast;
//import android.view.View;
//import android.widget.Button;
//import android.widget.ImageView;
import androidx.activity.EdgeToEdge;
//import androidx.annotation.Nullable;
import androidx.appcompat.app.AppCompatActivity;
//import androidx.core.content.ContextCompat;
import androidx.core.graphics.Insets;
import androidx.core.view.ViewCompat;
import androidx.core.view.WindowInsetsCompat;
import androidx.fragment.app.FragmentManager;
import androidx.fragment.app.FragmentTransaction;
import androidx.fragment.app.Fragment;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
//import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class MainActivity extends AppCompatActivity {
//************************参数定义******************
private fragment_page1 page1;
private FragmentManager fragmentManager;
private FragmentTransaction fragmentTransaction;
private Activitytopage1 topage1;
public MqttClient mqtt_client; //创建一个mqtt_client对象
String mqtt_sub_topic = "xxxx"; //需要订阅的主题
private String serverUri = "tcp://xxxxx:1883"; //这里可以填上各种云平台的物联网云平台的域名+1883端口号,什么阿里云腾讯云百度云天工物接入都可以,
// 这里我填的是我在我的阿里云服务器上搭建的EMQ平台的地址,
// 注意:前缀“tcp://”不可少,之前我没写,怎么都连不上,折腾了好久
private String userName = "xxxx"; //然后是你的用户名,阿里云腾讯云百度云天工物接入这些平台你新建设备后就自动生成了
private String passWord = "xxxxxx"; //用户名对应的密码,同样各种云平台都会对应生成密码,这里我的EMQ平台没做限制,所以用户名和密码可以随便填写
private String clientId = "xxxx"; //clientId
private EditText text;
public MqttConnectOptions options;//配置 保存控制客户端连接到服务器的方式的选项集。
//**************************************************************
//************************************************Activity非UI操作函数
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
EdgeToEdge.enable(this);
setContentView(R.layout.activity_main);
ViewCompat.setOnApplyWindowInsetsListener(findViewById(R.id.main), (v, insets) -> {
Insets systemBars = insets.getInsets(WindowInsetsCompat.Type.systemBars());
v.setPadding(systemBars.left, systemBars.top, systemBars.right, systemBars.bottom);
return insets;
});
//获取手机ID作为唯一标识
clientId = getAndroidID(this);
//连接MQTT
mqtt_init_Connect();
//创建页面1
page1 = new fragment_page1();
//添加fragment管理器
fragmentManager = getSupportFragmentManager();
//添加fragment切换器
fragmentTransaction = fragmentManager.beginTransaction();
//切换页面
fragmentTransaction.replace(R.id.fragment_container,page1);
//提交切换
fragmentTransaction.commit();
try
{
//订阅主题
mqtt_client.subscribe(mqtt_sub_topic,2);
}
catch (MqttException e)
{
// 处理异常,例如打印堆栈跟踪或执行其他恢复操作
e.printStackTrace();
}
}
//***********************************************
//********************给fragment调用用于自己切换页面切换函数
public void replaceFragment(Fragment fragment)
{
FragmentTransaction transaction = getSupportFragmentManager().beginTransaction();
transaction.replace(R.id.fragment_container, fragment);
transaction.addToBackStack(null); // Optional: Add to back stack for navigation
transaction.commit();
}
//*********************************************
//******************************************************MQTT连接函数
public void mqtt_init_Connect()
{
try {
//实例化mqtt_client,填入我们定义的serverUri和clientId,然后MemoryPersistence设置clientid的保存形式,默认为以内存保存
mqtt_client = new MqttClient(serverUri,clientId,new MemoryPersistence());
//创建并实例化一个MQTT的连接参数对象
options = new MqttConnectOptions();
//然后设置对应的参数
options.setUserName(userName); //设置连接的用户名
options.setPassword(passWord.toCharArray()); //设置连接的密码
options.setConnectionTimeout(60); // 设置超时时间,单位为秒
options.setKeepAliveInterval(60); //设置心跳,30s
options.setAutomaticReconnect(true); //是否重连
//设置是否清空session,设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(false);
//****************************************************设置回调
mqtt_client.setCallback(new MqttCallback() {
@Override
//*****************************突发断开重连
public void connectionLost(Throwable throwable) {
//连接丢失后,一般在这里面进行重连
topage1.Tofragment_other("断开重连");
try {
mqtt_client.reconnect();
topage1.Tofragment_other("断开重连成功");
} catch (MqttException e) {
// 处理异常,例如打印堆栈跟踪或执行其他恢复操作
topage1.Tofragment_ERR("断开重连异常");
e.printStackTrace();
}
}
//*****************************************************
//*****************************************************收到订阅消息后的处理
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
//收到订阅消息后会执行这里
topage1.Tofragment_sub(mqttMessage.toString());
topage1.Tofragment_sub_topic(s);
}
//**********************************
//*****************************发布消息后处理
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
//publish后会执行到这里
}
//*****************************
});
//连接mqtt服务器
mqtt_client.connect(options);
}
catch (Exception e)
{
topage1.Tofragment_ERR("MQTT连接异常");
e.printStackTrace();
}
}
//****************************************************
//****************************************接口回调函数
public void Activitytopage1_callback(Activitytopage1 callback){
topage1 = callback;
}
//****************************************
//*************************************释放资源
@Override
protected void onDestroy(){
super.onDestroy();
if(mqtt_client.isConnected()){
try{
mqtt_client.close();
}catch (Exception e)
{
e.printStackTrace();
}
}
}
//*****************************************
//***********************************************获取设备AndroidID
public static String getAndroidID(Context context) {
return Settings.Secure.getString(context.getContentResolver(), Settings.Secure.ANDROID_ID);
}
//***********************************************
};
发布函数:
我自己进行了一层小封装
// **********************发布***********************************
public void my_publish(String mytopic ,byte[] mypayload){
try {
//发布信息
activity.mqtt_client.publish(mytopic, mypayload, qos, retained);
Toast.makeText(getContext(),"发布成功",Toast.LENGTH_SHORT).show();
log.setText("发布成功");
} catch (MqttException e) {
// 处理异常,例如打印堆栈跟踪或执行其他恢复操作
e.printStackTrace();
Toast.makeText(getContext(),"发布失败",Toast.LENGTH_SHORT).show();
err.setText("发布失败");
}
}
// ********************************************************
订阅:
// **********************订阅***********************************
public void my_sub(String mytopic_sub ){
try {
activity.mqtt_client.subscribe(mytopic_sub,2);
}catch (MqttException e)
{
e.printStackTrace();
}
}
// ********************************************************
这里模拟的远程控制灯泡开关,的例子。