Docker Compose 构建 EMQX 集群 实现mqqt 和websocket

EMQX 集群化管理mqqt真香

目录

#目录 /usr/emqx

容器构建

vim docker-compose.yml

version: '3'

services:
  emqx1:
    image: emqx:5.8.3
    container_name: emqx1
    environment:
    - "EMQX_NODE_NAME=emqx@node1.emqx.io"
    - "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"
    - "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io]"
    healthcheck:
      test: ["CMD", "/opt/emqx/bin/emqx", "ctl", "status"]
      interval: 5s
      timeout: 25s
      retries: 5
    networks:
      emqx-bridge:
        aliases:
        - node1.emqx.io
    ports:
      - 1883:1883
      - 8083:8083
      - 8084:8084
      - 8883:8883
      - 18083:18083 
    # volumes:
    #   - $PWD/emqx1_data:/opt/emqx/data

  emqx2:
    image: emqx:5.8.3
    container_name: emqx2
    environment:
    - "EMQX_NODE_NAME=emqx@node2.emqx.io"
    - "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"
    - "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io]"
    healthcheck:
      test: ["CMD", "/opt/emqx/bin/emqx", "ctl", "status"]
      interval: 5s
      timeout: 25s
      retries: 5
    networks:
      emqx-bridge:
        aliases:
        - node2.emqx.io
    # volumes:
    #   - $PWD/emqx2_data:/opt/emqx/data

networks:
  emqx-bridge:
    driver: bridge

启动

docker-compose up -d

集群状态

#查看集群状态
docker exec -it emqx1 sh -c "emqx ctl cluster status"

#验证
telnet 192.168.0.15 1883
#内网
nc -zv  192.168.0.15 1883 

#账户
admin
#默认密码
public

服务开放端口

1883,8083,8084,8883,18083

端口占用

EMQX 默认使用以下端口,请确保这些端口未被其他应用程序占用,并按照需求开放防火墙以保证 EMQX 正常运行。

端口协议描述
1883TCPMQTT over TCP 监听器端口,主要用于未加密的 MQTT 连接。
8883TCPMQTT over SSL/TLS 监听器端口,用于加密的 MQTT 连接。
8083TCPMQTT over WebSocket 监听器端口,使 MQTT 能通过 WebSocket 进行通信。
8084TCPMQTT over WSS (WebSocket over SSL) 监听器端口,提供加密的 WebSocket 连接。
18083HTTPEMQX Dashboard 和 REST API 端口,用于管理控制台和 API 接口。
4370TCPErlang 分布式传输端口,根据节点名称不同实际端口可能是 BasePort (4370) + Offset。
5370TCP集群 RPC 端口(在 Docker 环境下为 5369),根据节点名称不同实际端口可能是 BasePort (5370) + Offset。

前端js示

<!DOCTYPE html>
<html>
<head>
  <title>MQTT WebSocket Test</title>
  <script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
</head>
<body>
  <script>
    // 使用提供的客户端 ID 或生成一个唯一 ID
    const clientId = 'emqx_NjI4MT2';

    // 配置 WebSocket MQTT broker 地址
    const host = 'ws://127.0.0.1:8083/mqtt';

    // MQTT 连接选项
    const options = {
      keepalive: 60, // 心跳时间间隔
      clientId: clientId,
      protocolId: 'MQTT', // 协议 ID
      protocolVersion: 5, // 使用 MQTT 5 协议
      clean: true, // 是否清除会话
      reconnectPeriod: 1000, // 重连间隔时间 (ms)
      connectTimeout: 30 * 1000, // 连接超时时间 (ms)
      username: 'admin', // 设置用户名
      password: 'public', // 设置密码
      will: {
        topic: 'pushRanking/1',
        payload: 'Connection Closed abnormally..!',
        qos: 0,
        retain: false
      },
    };

    console.log('Connecting mqtt client');

    // 连接到 MQTT Broker
    const client = mqtt.connect(host, options);

    // 连接成功回调
    client.on('connect', () => {
      console.log('Connected to MQTT broker');

      // 订阅主题 pushRanking/#,支持通配符
      client.subscribe('pushRanking/1', { qos: 0 }, (err) => {
        if (!err) {
          console.log('Subscribed to topic: pushRanking/1');
        } else {
          console.error('Failed to subscribe:', err);
        }
      });
    });

    // 处理接收到的消息
    client.on('message', (topic, message) => {
      console.log(`Received message from topic "${topic}": ${message.toString()}`);
    });

    // 连接错误回调
    client.on('error', (err) => {
      console.log('Connection error:', err);
      client.end();
    });

    // 重新连接回调
    client.on('reconnect', () => {
      console.log('Reconnecting...');
    });

    // 连接关闭回调
    client.on('close', () => {
      console.log('Connection closed');
    });

    // 模拟消息发布以测试接收
    setTimeout(() => {
      client.publish('pushRanking/1', JSON.stringify({ msg: 'hello' }), { qos: 0 });
    }, 5000);
  </script>
</body>
</html>

后端

package emqx

import (
    "fmt"
    mqtt "github.com/eclipse/paho.mqtt.golang"
    "testing"
    "time"
)

func TestMQTT(t *testing.T) {
    // 创建 EMQX 客户端实例
    client := NewEMQXClient("tcp://127.0.0.1:1883", "test-client", "admin", "QfTzLy3cop9NOGWj")

    // 连接到 EMQX
    if err := client.Connect(); err != nil {
       fmt.Printf("Failed to connect: %v\n", err)
       return
    }
    defer client.Disconnect()

    // 订阅主题
    client.Subscribe("testtopic/#", 1, func(client mqtt.Client, msg mqtt.Message) {
       fmt.Printf("Message received: %s\n", msg.Payload())
    })

    // 发布消息
    client.Publish("testtopic/1", 1, false, "Hello from Golang!")

    // 保持连接一段时间以接收消息
    time.Sleep(10 * time.Second)
}

/*长连接的场景 DEMO

func main() {
    client := emqxclient.NewEMQXClient("tcp://broker.emqx.io:1883", "test-client", "", "")

    if err := client.Connect(); err != nil {
       fmt.Printf("Failed to connect: %v\n", err)
       return
    }
    // 使用 defer 确保程序退出时断开连接
    defer client.Disconnect()

    // 订阅主题
    client.Subscribe("test/topic", 1, func(client mqtt.Client, msg mqtt.Message) {
       fmt.Printf("Received message: %s\n", msg.Payload())
    })

    // 发布消息
    client.Publish("test/topic", 1, false, "Hello from Golang!")

    // 捕获退出信号
    signalChan := make(chan os.Signal, 1)
    signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)

    fmt.Println("Running... Press Ctrl+C to exit.")
    <-signalChan
    fmt.Println("Exiting...")
}



*/

/*JS 调用如下
import mqtt from 'mqtt';

const brokerURL = 'ws://broker.emqx.io:8083/mqtt'; // WebSocket 连接地址
const clientID = `mqttjs_${Math.random().toString(16).substr(2, 8)}`;

// 创建客户端
const client = mqtt.connect(brokerURL, {
  clientId: clientID,
  username: '', // 如需要认证,填入用户名
  password: '', // 如需要认证,填入密码
});

// 连接事件
client.on('connect', () => {
  console.log('Connected to EMQX');

  // 订阅主题
  client.subscribe('test/topic', (err) => {
    if (!err) {
      console.log('Subscribed to topic: test/topic');
    } else {
      console.error('Failed to subscribe:', err);
    }
  });

  // 发布消息
  client.publish('test/topic', 'Hello from JavaScript!');
});

// 接收消息事件
client.on('message', (topic, message) => {
  console.log(`Received message on topic "${topic}": ${message.toString()}`);
});

// 错误事件
client.on('error', (err) => {
  console.error('Connection error:', err);
});







*/

common封装调用

package emqx

import (
    "fmt"
    "time"

    mqtt "github.com/eclipse/paho.mqtt.golang"
)

type EMQXClient struct {
    client mqtt.Client
}

// NewEMQXClient 初始化 EMQX 客户端
func NewEMQXClient(broker string, clientID string, username string, password string) *EMQXClient {
    opts := mqtt.NewClientOptions().
       AddBroker(broker).
       SetClientID(clientID).
       SetUsername(username).
       SetPassword(password).
       SetKeepAlive(60 * time.Second).
       SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
          fmt.Printf("Received message on topic: %s, message: %s\n", msg.Topic(), msg.Payload())
       }).
       SetPingTimeout(1 * time.Second)

    client := mqtt.NewClient(opts)
    return &EMQXClient{client: client}
}

// Connect 连接到 EMQX
func (c *EMQXClient) Connect() error {
    token := c.client.Connect()
    if token.Wait() && token.Error() != nil {
       return token.Error()
    }
    fmt.Println("Connected to EMQX broker")
    return nil
}

// Publish 发布消息
func (c *EMQXClient) Publish(topic string, qos byte, retained bool, payload interface{}) error {
    token := c.client.Publish(topic, qos, retained, payload)
    token.Wait()
    return token.Error()
}

// Subscribe 订阅主题
func (c *EMQXClient) Subscribe(topic string, qos byte, callback mqtt.MessageHandler) error {
    token := c.client.Subscribe(topic, qos, callback)
    token.Wait()
    return token.Error()
}

// Unsubscribe 取消订阅
func (c *EMQXClient) Unsubscribe(topics ...string) error {
    token := c.client.Unsubscribe(topics...)
    token.Wait()
    return token.Error()
}

// Disconnect 断开连接
func (c *EMQXClient) Disconnect() {
    c.client.Disconnect(250)
    fmt.Println("Disconnected from EMQX broker")
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/946309.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Cesium】三、实现开场动画效果

文章目录 实现效果实现方法实现代码组件化 实现效果 实现方法 Cesium官方提供了Camera的flyTo方法实现了飞向目的地的动画效果。 官方API&#xff1a;传送门 这里只需要用到目的地&#xff08;destination&#xff09;和持续时间&#xff08;duration&#xff09;这两个参数…

Qt从入门到入土(七)-实现炫酷的登录注册界面(下)

前言 Qt从入门到入土&#xff08;六&#xff09;-实现炫酷的登录注册界面&#xff08;上&#xff09;主要讲了如何使用QSS样式表进行登录注册的界面设计&#xff0c;本篇文章将介绍如何对登录注册界面进行整体控件的布局&#xff0c;界面的切换以及实现登录、记住密码等功能。…

智能化人才招聘系统是怎样的?

随着企业规模的扩大和业务范围的拓展&#xff0c;人才招聘成为了企业发展的关键环节。然而&#xff0c;市面上的人才招聘系统琳琅满目&#xff0c;质量参差不齐&#xff0c;许多企业发现&#xff0c;并非所有系统都能满足他们的需求&#xff0c;特别是智能化的需求。今天&#…

论文分享 | PromptFuzz:用于模糊测试驱动程序生成的提示模糊测试

大语言模型拥有的强大能力可以用来辅助多种工作&#xff0c;但如何有效的辅助仍然需要人的精巧设计。分享一篇发表于2024年CCS会议的论文PromptFuzz&#xff0c;它利用模型提示生成模糊测试驱动代码&#xff0c;并将代码片段嵌入到LLVM框架中执行模糊测试。 论文摘要 制作高质…

[最佳方法] 如何将视频从 Android 发送到 iPhone

概括 将大视频从 Android 发送到 iPhone 或将批量视频从 iPhone 传输到 Android 并不是一件容易的事情。也许您已经尝试了很多关于如何将视频从 Android 发送到 iPhone 15/14 的方法&#xff0c;但都没有效果。但现在&#xff0c;通过本文中的这 6 种强大方法&#xff0c;您可…

cesium小知识: 处理动画的5种方式

在 Cesium 中处理动画可以通过多种方式实现,具体取决于你想要创建的动画类型。Cesium 提供了丰富的API来支持不同种类的动画,包括但不限于物体的移动、旋转、缩放、属性变化等。以下是几种常见的动画处理方法: 1. 使用 Entity 和 SampledProperty 对于动态数据或随时间变化…

003:如何理解 CNN 中的 RGB 图像和通道?

本文为合集收录&#xff0c;欢迎查看合集/专栏链接进行全部合集的系统学习。 合集完整版请参考这里。 在灰度图一节的最后&#xff0c;给出了一个由彩色图片转成灰度图的示例&#xff0c;并且通过 color_image.mode获取了图片的格式&#xff1a;彩色图片获取到的格式为 RGBA&a…

小程序基础 —— 07 创建小程序项目

创建小程序项目 打开微信开发者工具&#xff0c;左侧选择小程序&#xff0c;点击 号即可新建项目&#xff1a; 在弹出的新页面&#xff0c;填写项目信息&#xff08;后端服务选择不使用云服务&#xff0c;开发模式为小程序&#xff0c;模板选择为不使用模板&#xff09;&…

TP 钱包插件版本的使用

目前 TokenPocket 的几个平台中&#xff0c;以 ios 和 安卓版本最为常见&#xff0c;其实很少有人知道&#xff0c;浏览器上有一个插件版本的 Tp, 用电脑多的话&#xff0c;这也是一个挺好的选择。 最新版本现在支持Chrome、Brave 浏览器、Edge&#xff08;Firefox及Opera正在…

【AIGC】使用Java实现Azure语音服务批量转录功能:完整指南

文章目录 引言技术背景环境准备详细实现1. 基础架构设计2. 实现文件上传功能3. 提交转录任务crul4. 获取转录结果 使用示例结果示例最佳实践与注意事项总结 引言 在当今数字化时代&#xff0c;将音频内容转换为文本的需求越来越普遍。无论是会议记录、视频字幕生成&#xff0c…

【UVM】搭建一个验证平台

UVM环境组件 组件功能 sequence_item&#xff1a;包装数据 UVM中&#xff0c;所有的transaction都要从uvm_sequence_item派生sequence item是每一次driver与DUT互动的最小粒度内容sequence&#xff1a;产生数据 uvm_sequence是一个参数化的类&#xff0c;其参数是transactio…

用Python操作字节流中的Excel文档

Python能够轻松地从字节流中加载文件&#xff0c;在不依赖于外部存储的情况下直接对其进行读取、修改等复杂操作&#xff0c;并最终将更改后的文档保存回字节串中。这种能力不仅极大地提高了数据处理的灵活性&#xff0c;还确保了数据的安全性和完整性&#xff0c;尤其是在网络…

.Net加密与Java互通

.Net加密与Java互通 文章目录 .Net加密与Java互通前言RSA生成私钥和公钥.net加密出数据传给Java端采用java方给出的公钥进行加密采用java方给出的私钥进行解密 .net 解密来自Java端的数据 AES带有向量的AES加密带有向量的AES解密无向量AES加密无向量AES解密 SM2(国密)SM2加密Sm…

elasticsearch-java客户端jar包中各模块的应用梳理

最近使用elasticsearch-java客户端实现对elasticsearch服务的Api请求&#xff0c;现对elasticsearch-java客户端jar包中各模块的应用做个梳理。主要是对co.elastic.clients.elasticsearch路径下的各子包的简单说明。使用的版本为&#xff1a;co.elastic.clients:elasticsearch-…

119.【C语言】数据结构之快速排序(调用库函数)

目录 1.C语言快速排序的库函数 1.使用qsort函数前先包含头文件 2.qsort的四个参数 3.qsort函数使用 对int类型的数据排序 运行结果 对char类型的数据排序 运行结果 对浮点型数据排序 运行结果 2.题外话:函数名的本质 1.C语言快速排序的库函数 cplusplus网的介绍 ht…

JVM实战—G1垃圾回收器的原理和调优

1.G1垃圾回收器的工作原理 (1)ParNew CMS的组合有哪些痛点 Stop the World是最大的问题。无论是新生代GC还是老年代GC&#xff0c;都会或多或少产生STW现象&#xff0c;这对系统的运行是有一定影响的。 所以JVM对垃圾回收器的优化&#xff0c;都是朝减少STW的目标去做的。在这…

HuatuoGPT-o1:基于40K可验证医学问题的两阶段复杂推理增强框架,通过验证器引导和强化学习提升医学模型的推理能力

HuatuoGPT-o1&#xff1a;基于40K可验证医学问题的两阶段复杂推理增强框架&#xff0c;通过验证器引导和强化学习提升医学模型的推理能力 论文大纲理解1. 确认目标2. 分析过程3. 实现步骤4. 效果展示 解法拆解全流程提问俩阶段详细分析 论文&#xff1a;HuatuoGPT-o1, Towards …

HTML——45.单元格合并

<!DOCTYPE html> <html><head><meta charset"UTF-8"><title>表格</title></head><body><!--合并单元格&#xff1a;1.在代码中找到要合并的单元格2.在要合并的所有单元格中&#xff0c;保留要合并的第一个单元格…

electron在arm64架构交叉编译遇到libnotify/notify.h文件找不到错误记录

问题描述 在按照官方文档进行arm64下electron编译时出现下面的错误&#xff0c;编译环境为ubuntun22.04.5。 问题分析 由于当前目标架构是arm64&#xff0c;所以从上图可知sysroot为build/linux/debian_bullseye_arm64-sysroot&#xff0c;进入到该目录下查看libnotify的头文…

我的创作纪念日与2024年年报

我的创作纪念日 机缘 原来是你&#xff01; 收获 在创作的过程中都有哪些收获 获得了14668粉丝的关注。获得了正向或者反向的反馈&#xff1a;1万多赞、426评论、140多万阅读量等。认识和哪些志同道合的领域同行&#xff1a;有且再寻觅。 日常 &#x1f3e0;个人主页&…