EMQX 集群化管理mqqt真香
目录
#目录 /usr/emqx
容器构建
vim docker-compose.yml
version: '3'
services:
emqx1:
image: emqx:5.8.3
container_name: emqx1
environment:
- "EMQX_NODE_NAME=emqx@node1.emqx.io"
- "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"
- "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io]"
healthcheck:
test: ["CMD", "/opt/emqx/bin/emqx", "ctl", "status"]
interval: 5s
timeout: 25s
retries: 5
networks:
emqx-bridge:
aliases:
- node1.emqx.io
ports:
- 1883:1883
- 8083:8083
- 8084:8084
- 8883:8883
- 18083:18083
# volumes:
# - $PWD/emqx1_data:/opt/emqx/data
emqx2:
image: emqx:5.8.3
container_name: emqx2
environment:
- "EMQX_NODE_NAME=emqx@node2.emqx.io"
- "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"
- "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io]"
healthcheck:
test: ["CMD", "/opt/emqx/bin/emqx", "ctl", "status"]
interval: 5s
timeout: 25s
retries: 5
networks:
emqx-bridge:
aliases:
- node2.emqx.io
# volumes:
# - $PWD/emqx2_data:/opt/emqx/data
networks:
emqx-bridge:
driver: bridge
启动
docker-compose up -d
集群状态
#查看集群状态
docker exec -it emqx1 sh -c "emqx ctl cluster status"
#验证
telnet 192.168.0.15 1883
#内网
nc -zv 192.168.0.15 1883
#账户
admin
#默认密码
public
服务开放端口
1883,8083,8084,8883,18083
端口占用
EMQX 默认使用以下端口,请确保这些端口未被其他应用程序占用,并按照需求开放防火墙以保证 EMQX 正常运行。
端口 | 协议 | 描述 |
1883 | TCP | MQTT over TCP 监听器端口,主要用于未加密的 MQTT 连接。 |
8883 | TCP | MQTT over SSL/TLS 监听器端口,用于加密的 MQTT 连接。 |
8083 | TCP | MQTT over WebSocket 监听器端口,使 MQTT 能通过 WebSocket 进行通信。 |
8084 | TCP | MQTT over WSS (WebSocket over SSL) 监听器端口,提供加密的 WebSocket 连接。 |
18083 | HTTP | EMQX Dashboard 和 REST API 端口,用于管理控制台和 API 接口。 |
4370 | TCP | Erlang 分布式传输端口,根据节点名称不同实际端口可能是 BasePort (4370) + Offset。 |
5370 | TCP | 集群 RPC 端口(在 Docker 环境下为 5369),根据节点名称不同实际端口可能是 BasePort (5370) + Offset。 |
前端js示
<!DOCTYPE html>
<html>
<head>
<title>MQTT WebSocket Test</title>
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
</head>
<body>
<script>
// 使用提供的客户端 ID 或生成一个唯一 ID
const clientId = 'emqx_NjI4MT2';
// 配置 WebSocket MQTT broker 地址
const host = 'ws://127.0.0.1:8083/mqtt';
// MQTT 连接选项
const options = {
keepalive: 60, // 心跳时间间隔
clientId: clientId,
protocolId: 'MQTT', // 协议 ID
protocolVersion: 5, // 使用 MQTT 5 协议
clean: true, // 是否清除会话
reconnectPeriod: 1000, // 重连间隔时间 (ms)
connectTimeout: 30 * 1000, // 连接超时时间 (ms)
username: 'admin', // 设置用户名
password: 'public', // 设置密码
will: {
topic: 'pushRanking/1',
payload: 'Connection Closed abnormally..!',
qos: 0,
retain: false
},
};
console.log('Connecting mqtt client');
// 连接到 MQTT Broker
const client = mqtt.connect(host, options);
// 连接成功回调
client.on('connect', () => {
console.log('Connected to MQTT broker');
// 订阅主题 pushRanking/#,支持通配符
client.subscribe('pushRanking/1', { qos: 0 }, (err) => {
if (!err) {
console.log('Subscribed to topic: pushRanking/1');
} else {
console.error('Failed to subscribe:', err);
}
});
});
// 处理接收到的消息
client.on('message', (topic, message) => {
console.log(`Received message from topic "${topic}": ${message.toString()}`);
});
// 连接错误回调
client.on('error', (err) => {
console.log('Connection error:', err);
client.end();
});
// 重新连接回调
client.on('reconnect', () => {
console.log('Reconnecting...');
});
// 连接关闭回调
client.on('close', () => {
console.log('Connection closed');
});
// 模拟消息发布以测试接收
setTimeout(() => {
client.publish('pushRanking/1', JSON.stringify({ msg: 'hello' }), { qos: 0 });
}, 5000);
</script>
</body>
</html>
后端
package emqx
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"testing"
"time"
)
func TestMQTT(t *testing.T) {
// 创建 EMQX 客户端实例
client := NewEMQXClient("tcp://127.0.0.1:1883", "test-client", "admin", "QfTzLy3cop9NOGWj")
// 连接到 EMQX
if err := client.Connect(); err != nil {
fmt.Printf("Failed to connect: %v\n", err)
return
}
defer client.Disconnect()
// 订阅主题
client.Subscribe("testtopic/#", 1, func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Message received: %s\n", msg.Payload())
})
// 发布消息
client.Publish("testtopic/1", 1, false, "Hello from Golang!")
// 保持连接一段时间以接收消息
time.Sleep(10 * time.Second)
}
/*长连接的场景 DEMO
func main() {
client := emqxclient.NewEMQXClient("tcp://broker.emqx.io:1883", "test-client", "", "")
if err := client.Connect(); err != nil {
fmt.Printf("Failed to connect: %v\n", err)
return
}
// 使用 defer 确保程序退出时断开连接
defer client.Disconnect()
// 订阅主题
client.Subscribe("test/topic", 1, func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message: %s\n", msg.Payload())
})
// 发布消息
client.Publish("test/topic", 1, false, "Hello from Golang!")
// 捕获退出信号
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)
fmt.Println("Running... Press Ctrl+C to exit.")
<-signalChan
fmt.Println("Exiting...")
}
*/
/*JS 调用如下
import mqtt from 'mqtt';
const brokerURL = 'ws://broker.emqx.io:8083/mqtt'; // WebSocket 连接地址
const clientID = `mqttjs_${Math.random().toString(16).substr(2, 8)}`;
// 创建客户端
const client = mqtt.connect(brokerURL, {
clientId: clientID,
username: '', // 如需要认证,填入用户名
password: '', // 如需要认证,填入密码
});
// 连接事件
client.on('connect', () => {
console.log('Connected to EMQX');
// 订阅主题
client.subscribe('test/topic', (err) => {
if (!err) {
console.log('Subscribed to topic: test/topic');
} else {
console.error('Failed to subscribe:', err);
}
});
// 发布消息
client.publish('test/topic', 'Hello from JavaScript!');
});
// 接收消息事件
client.on('message', (topic, message) => {
console.log(`Received message on topic "${topic}": ${message.toString()}`);
});
// 错误事件
client.on('error', (err) => {
console.error('Connection error:', err);
});
*/
common封装调用
package emqx
import (
"fmt"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
type EMQXClient struct {
client mqtt.Client
}
// NewEMQXClient 初始化 EMQX 客户端
func NewEMQXClient(broker string, clientID string, username string, password string) *EMQXClient {
opts := mqtt.NewClientOptions().
AddBroker(broker).
SetClientID(clientID).
SetUsername(username).
SetPassword(password).
SetKeepAlive(60 * time.Second).
SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message on topic: %s, message: %s\n", msg.Topic(), msg.Payload())
}).
SetPingTimeout(1 * time.Second)
client := mqtt.NewClient(opts)
return &EMQXClient{client: client}
}
// Connect 连接到 EMQX
func (c *EMQXClient) Connect() error {
token := c.client.Connect()
if token.Wait() && token.Error() != nil {
return token.Error()
}
fmt.Println("Connected to EMQX broker")
return nil
}
// Publish 发布消息
func (c *EMQXClient) Publish(topic string, qos byte, retained bool, payload interface{}) error {
token := c.client.Publish(topic, qos, retained, payload)
token.Wait()
return token.Error()
}
// Subscribe 订阅主题
func (c *EMQXClient) Subscribe(topic string, qos byte, callback mqtt.MessageHandler) error {
token := c.client.Subscribe(topic, qos, callback)
token.Wait()
return token.Error()
}
// Unsubscribe 取消订阅
func (c *EMQXClient) Unsubscribe(topics ...string) error {
token := c.client.Unsubscribe(topics...)
token.Wait()
return token.Error()
}
// Disconnect 断开连接
func (c *EMQXClient) Disconnect() {
c.client.Disconnect(250)
fmt.Println("Disconnected from EMQX broker")
}