1. 什么是 MQTT?
MQTT(Message Queuing Telemetry Transport)
是一种轻量级、基于发布-订阅模式的消息传输协议,适用于资源受限的设备和低带宽、高延迟或不稳定的网络环境。它在物联网应用中广受欢迎,能够实现传感器、执行器和其它设备之间的高效通信。
2. 为什么 MQTT 是适用于物联网的最佳协议?
MQTT 所具有的适用于物联网特定需求的特点和功能,使其成为物联网领域最佳的协议之一。它的主要特点包括:
-
轻量级:物联网设备通常在处理能力、内存和能耗方面受到限制。MQTT 开销低、报文小的特点使其非常适合这些设备,因为它消耗更少的资源,即使在有限的能力下也能实现高效的通信。
-
可靠:物联网网络常常面临高延迟或连接不稳定的情况。MQTT 支持多种 QoS 等级、会话感知和持久连接,即使在困难的条件下也能保证消息的可靠传递,使其非常适合物联网应用。
-
安全通信:安全对于物联网网络至关重要,因为其经常涉及敏感数据的传输。为确保数据在传输过程中的机密性,MQTT 提供传输层安全(TLS)和安全套接层(SSL)加密功能。此外,MQTT 还通过用户名/密码凭证或客户端证书提供身份验证和授权机制,以保护网络及其资源的访问。
-
双向通信:MQTT 的发布-订阅模式为设备之间提供了无缝的双向通信方式。客户端既可以向主题发布消息,也可以订阅接收特定主题上的消息,从而实现了物联网生态系统中的高效数据交换,而无需直接将设备耦合在一起。这种模式也简化了新设备的集成,同时保证了系统易于扩展。
-
连续、有状态的会话:MQTT 提供了客户端与 Broker 之间保持有状态会话的能力,这使得系统即使在断开连接后也能记住订阅和未传递的消息。此外,客户端还可以在建立连接时指定一个保活间隔,这会促使 Broker 定期检查连接状态。如果连接中断,Broker 会储存未传递的消息(根据 QoS 级别确定),并在客户端重新连接时尝试传递它们。这个特性保证了通信的可靠性,降低了因间断性连接而导致数据丢失的风险。
-
大规模物联网设备支持:物联网系统往往涉及大量设备,需要一种能够处理大规模部署的协议。MQTT 的轻量级特性、低带宽消耗和对资源的高效利用使其成为大规模物联网应用的理想选择。通过采用发布-订阅模式,MQTT 实现了发送者和接收者的解耦,从而有效地减少了网络流量和资源使用。此外,协议对不同 QoS 等级的支持使得消息传递可以根据需求进行定制,确保在各种场景下获得最佳的性能表现。
-
语言支持:物联网系统包含使用各种编程语言开发的设备和应用。MQTT 具有广泛的语言支持,使其能够轻松与多个平台和技术进行集成,从而实现了物联网生态系统中的无缝通信和互操作性。您可以阅读我们的 MQTT 客户端编程系列文章,学习如何在 PHP、Node.js、Python、Golang、Node.js 等编程语言中使用 MQTT。
3. MQTT 的工作原理
- 要了解 MQTT 的工作原理,首先需要掌握以下几个概念:MQTT 客户端、MQTT Broker、发布-订阅模式、主题、QoS。
3.1. MQTT 客户端
- 任何运行 MQTT 客户端库的应用或设备都是 MQTT 客户端。例如,使用 MQTT 的即时通讯应用是客户端,使用 MQTT 上报数据的各种传感器是客户端,各种 MQTT 测试工具也是客户端。
3.2. MQTT Broker
- MQTT Broker 是负责处理客户端请求的关键组件,包括建立连接、断开连接、订阅和取消订阅等操作,同时还负责消息的转发。一个高效强大的 MQTT Broker 能够轻松应对海量连接和百万级消息吞吐量,从而帮助物联网服务提供商专注于业务发展,快速构建可靠的 MQTT 应用。
3.3. 发布-订阅模式
- 发布-订阅模式与客户端-服务器模式的不同之处在于,它将发送消息的客户端(发布者)和接收消息的客户端(订阅者)进行了解耦。发布者和订阅者之间无需建立直接连接,而是通过 MQTT Broker 来负责消息的路由和分发。
- 下图展示了 MQTT 发布/订阅过程。温度传感器作为客户端连接到 MQTT Broker,并通过发布操作将温度数据发布到一个特定主题(例如 Temperature)。MQTT Broker 接收到该消息后会负责将其转发给订阅了相应主题(Temperature)的订阅者客户端。
4. MQTT 的工作流程
-
在了解了 MQTT 的基本组件之后,让我们来看看它的一般工作流程:
- 客户端使用 TCP/IP 协议与 Broker 建立连接,可以选择使用 TLS/SSL 加密来实现安全通信。客户端提供认证信息,并指定会话类型(Clean Session 或 Persistent Session)。
- 客户端既可以向特定主题发布消息,也可以订阅主题以接收消息。当客户端发布消息时,它会将消息发送给 MQTT Broker;而当客户端订阅消息时,它会接收与订阅主题相关的消息。
- MQTT Broker 接收发布的消息,并将这些消息转发给订阅了对应主题的客户端。它根据 QoS 等级确保消息可靠传递,并根据会话类型为断开连接的客户端存储消息。
5. MQTT集群搭建
- 官方中文文档地址
- 环境准备
主机 | 系统 |
---|---|
172.16.60.10 | centos7 |
172.16.60.20 | centos7 |
172.16.60.30 | centos7 |
5.1. 安装部署
#安装emqx,每台主机都需要安装
yum -y install https://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm
yum -y install https://rpms.remirepo.net/enterprise/remi-release-7.rpm
yum -y install openssl11
yum -y install https://www.emqx.com/zh/downloads/broker/v5.6.0/emqx-5.6.0-el7-amd64.rpm
#启动emqx
systemctl start emqx.service
5.2. 创建emqx集群
5.2.1. 修改配置文件
- 60.10配置文件修改
vim /etc/emqx/emqx.conf
node {
name = "emqx@172.16.60.10" #指定了节点的名称
cookie = "emqxsecretcookie" #用于集群节点之间的身份验证。所有集群节点都必须使用相同的cookie值才能加入集群
data_dir = "/var/lib/emqx" #指定了EMQX存储数据的目录路径,例如日志文件、持久化队列等
}
cluster {
name = emqxcl #指定了集群的名称,所有集群成员都必须使用相同的名称
discovery_strategy = manual #指定了集群发现策略,manual意味着集群成员之间的关系需要手动配置
}
log {
# file {
# level = warning
# }
# console {
# level = warning
# }
}
dashboard {
listeners.http {
bind = 18083 #指定了仪表盘绑定的端口号,这里是 18083。这意味着仪表盘可以通过http://<HostName>:18083 访问
}
}
- 60.20配置文件修改
node {
name = "emqx@172.16.60.20"
cookie = "emqxsecretcookie"
data_dir = "/var/lib/emqx"
}
cluster {
name = emqxcl
discovery_strategy = manual
}
log {
# file {
# level = warning
# }
# console {
# level = warning
# }
}
dashboard {
listeners.http {
bind = 18083
}
}
- 60.30配置文件修改
node {
name = "emqx@172.16.60.30"
cookie = "emqxsecretcookie"
data_dir = "/var/lib/emqx"
}
cluster {
name = emqxcl
discovery_strategy = manual
}
log {
# file {
# level = warning
# }
# console {
# level = warning
# }
}
dashboard {
listeners.http {
bind = 18083
}
}
5.2.2. 创建集群
#在60.10执行以下命令
emqx_ctl cluster join emqx@172.16.60.20
#在60.30执行以下命令
emqx_ctl cluster join emqx@172.16.60.20
#在任意一台执行以下命令,查询集群状态
emqx_ctl cluster status
Cluster status: #{running_nodes =>
['emqx@172.16.60.10','emqx@172.16.60.20',
'emqx@172.16.60.30'],
stopped_nodes => []}
5.3. 登录web界面验证
- ip地址:18083
- ip地址可以是三个的任意一个IP地址即可
- 默认账号:admin,密码:public
6. 集群负载均衡
-
负载均衡(Load Balancing)用于均衡多个网络组件的负载,从而优化资源的使用,避免由于组件过载造成故障。负载均衡虽然不是集群中的必备组件,但是能给集群带来一些非常有用的特性,例如当配置在 EMQX 集群中时,将能带来如下优势:
- 均衡 EMQX 的负载,避免出现单节点过载的情况;
- 简化客户端配置,客户端只需连接到负载均衡器上,无需关心集群内部伸缩变化;
- TLS/SSL 终结,减轻 EMQX 集群的负担;
- 提高安全性,有了负载均衡在集群前端,能够通过设置阻止不需要的流量,保护 EMQX 集群免受恶意攻击。
6.1. 用 NGINX 负载均衡 EMQX 集群
- NGINX 是一种高性能多功能的服务器软件,可以作为 Web 服务器和反向代理服务器,除此以外,NGINX 还可以作为负载均衡器将来自客户端的请求分发到多个后端服务器,以确保负载平衡和性能优化。NGINX 非常适用于物联网应用,因为它可以处理大量的并发请求。在物联网中,设备数量庞大,因此需要一个可以处理大量请求的服务器来保证稳定性。EMQX 原生支持由多个 MQTT 服务器组成的分布式集群架构,因此,使用 NGINX 部署负载均衡以及 EMQX 集群,可以保证高可用性和可扩展性。
6.2. 功能与优势
使用 NGINX 负载均衡 EMQX 集群具备以下几个功能和优势:
- 作为反向代理服务器,NGINX 位于 MQTT 服务器端,代表 MQTT 客户端向 EMQX 集群发起 MQTT 连接请求,并代替 EMQX 集群处理请求,然后将 EMQX 集群的响应返回给 MQTT 客户端。这样的设置可以将多个集群隐藏起来,暴露一个接入点给 MQTT 客户端。MQTT 客户端只需要与 NGINX 通信,而不需要知道后面的集群数量和布局,这种方式可以提高系统的可维护性和可扩展性。
- NGINX 可用于终结 MQTT 客户端与 EMQX 集群之间经 SSL 加密的 MQTT 连接,减轻 EMQX 集群的加密解密负担。从而提供多种优势,如提高性能、简化证书管理和增强安全性。
- NGINX 具有灵活的负载均衡策略,以使用不同的策略来决定请求应该发送到集群中的哪个 EMQX 节点,有助于分摊流量和请求,提高性能和可靠性。例如粘性负载平衡,可将请求路由到同一后端服务器,从而提高性能和会话持久性。
6.3. 安装nginx
- 可以从 NGINX 官方网站下载最新的稳定版本
#安装依赖包
yum -y install gcc gcc-c++ pcre pcre-devel zlib zlib-devel openssl openssl-devel
#创建用户
useradd -s /sbin/nologin nginx
#创建安装目录
cd /usr/local
wget https://nginx.org/download/nginx-1.26.1.tar.gz
tar -xf nginx-1.26.1.tar.gz
cd nginx-1.26.1
./configure \
--prefix=/data/nginx/ \
--user=nginx \
--group=nginx \
--with-threads \
--with-http_stub_status_module \
--with-http_ssl_module \
--with-http_realip_module \
--with-stream \
--with-stream_ssl_module
#预编译
make
#编译
make
#安装
make install
#将 NGINX 可执行文件连接到系统 PATH 中的目录
ln -s /data/nginx/sbin/nginx /usr/local/bin/nginx
6.4. 修改配置文件
vim /data/nginx/conf/nginx.conf
worker_processes auto; #指定Nginx启动的工作进程数,auto表示根据系统的CPU核心数自动设置工作进程数
events {
worker_connections 1024; #定义事件模块,worker_connections指定每个工作进程可以同时打开的最大连接数,这里是1024
}
error_log /data/nginx/logs/error.log warn; #定义错误日志文件的位置和日志级别,这里设置为警告级别及以上
pid /data/nginx/nginx.pid; #指定存储Nginx主进程ID的文件路径
stream { #定义一个流模块,用于配置TCP/UDP代理
#定义自定义日志格式,包含客户端地址、端口、协议、状态码、时间戳、上游服务器地址、发送字节数和连接时间
log_format proxy '$remote_addr $remote_port $protocol $status
[$time_iso8601] '
'"$upstream_addr" "$upstream_bytes_sent"
"$upstream_connect_time"' ;
access_log /data/nginx/logs/access.log proxy; #指定访问日志文件的路径,并使用上面定义的proxy日志格式
upstream mqtt_servers { #定义一个名为mqtt_servers的上游服务器组,用于负载均衡
#在上游服务器组中添加一个服务器,指定其IP地址和端口号,以及最大失败次数和故障超时时间
server 172.16.60.10:1883 max_fails=2 fail_timeout=10s;
server 172.16.60.20:1883 max_fails=2 fail_timeout=10s;
server 172.16.60.30:1883 max_fails=2 fail_timeout=10s;
}
server { #定义一个服务器块,用于配置代理服务器的监听端口和其他设置
listen 1883; #指定Nginx监听的端口号,这里是1883,通常用于MQTT协议
proxy_pass mqtt_servers; #指定代理请求转发的上游服务器组名称
proxy_protocol on; #启用PROXY协议,允许获取客户端的真实IP地址和端口信息
proxy_connect_timeout 10s; #设置与上游服务器建立连接的超时时间
proxy_timeout 1800s; #设置代理服务器的读写超时时间,这里是30分钟
proxy_buffer_size 3M; #设置代理缓冲区的大小
tcp_nodelay on; #启用TCP_NODELAY选项,意味着立即发送数据而不等待更多数据积累
}
}
6.5. 启动nginx
#启动服务
nginx
#加载配置文件
nginx -s reload
#停止服务
nginx -s stop