nginx反向代理kafka集群实现内外网隔离访问 —— 筑梦之路

背景说明

我们在使用Kafka客户端连接到Kafka集群时,即使连接的节点只配置了一个集群的Broker地址,该Broker将返回给客户端集群所有节点的信息列表。然后客户端使用该列表信息(Topic的分区信息)再与集群进行数据交互。这里Kafka列表信息为服务配置文件service.propertiesadvertised.listeners配置项中的信息。例如:

advertised.listeners=PLAINTEXT://192.168.1.1:9092

这样在通信中就存在一个网络连通性问题。如果Kafka位于内网环境,而客户端位于外网环境,即使内外网配置了IP地址映射(网络层面的NAT),由于返回给外网客户端的IP列表是内网地址,客户端和Broker第一次通讯获取集群元数据中包含是advertised.listeners配置中的内网地址信息,由于内外网隔离,就会出现客户端和集群的通信无法通讯的报错。

通常对于这种情况的解决方案是Kafka集群的advertised.listeners配置项使用主机名方式。例如:

advertised.listeners=PLAINTEXT://Kafka.node1:9092

在实际业务场景中,系统架构上确实存在需要使用Nginx的场景,这时候就需要我们在架构设计上要符合Kafka的通讯机制。

kafka集群服务端

1. 配置kafka集群

# kafka1 节点1

cat server.properties |grep -v ^# | grep -v ^$
broker.id=0
listeners=INTERNAL://192.168.100.100:9093,EXTERNAL://kafka1:9092
inter.broker.listener.name=INTERNAL
advertised.listeners=INTERNAL://192.168.100.100:9093,EXTERNAL://kafka1:9092
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.100.100:2181,192.168.100.101:2181,192.168.100.102:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

# kafka2 节点2

cat server.properties |grep -v ^# | grep -v ^$
broker.id=0
listeners=INTERNAL://192.168.100.101:9093,EXTERNAL://kafka2:9092
inter.broker.listener.name=INTERNAL
advertised.listeners=INTERNAL://192.168.100.101:9093,EXTERNAL://kafka2:9092
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.100.100:2181,192.168.100.101:2181,192.168.100.102:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

# kafka3 节点3

cat server.properties |grep -v ^# | grep -v ^$
broker.id=0
listeners=INTERNAL://192.168.100.102:9093,EXTERNAL://kafka3:9092
inter.broker.listener.name=INTERNAL
advertised.listeners=INTERNAL://192.168.100.102:9093,EXTERNAL://kafka3:9092
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.100.100:2181,192.168.100.101:2181,192.168.100.102:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

2. 配置hosts映射

# 三个节点均需要配置主机名映射

cat  /etc/hosts

192.168.100.100 kafka1
192.168.100.101 kafka2
192.168.100.102 kafka3

3. 启动命令示例

./bin/kafka-server-start.sh  -daemon config/server.properties 

Nginx中间代理

nginx代理作为kafka服务端和客户端隔离的中转,因此需要两边的网络都能访问。

1. 配置hosts主机名映射

192.168.100.100  kafka1
192.168.100.101  kafka2
192.168.100.102  kafka3

2. 配置TCP四层代理

nginx四层代理需要使用stream模块,因此nginx一般建议使用rpm包安装或编译安装开启此模块。

# nginx 实例1
stream {
  server{
      listen 9092;
      proxy_pass node1;
  }
  upstream node1{
    server kafka1:9092 weight=1;
  }
}

# nginx实例2

stream {
  server{
      listen 9092;
      proxy_pass node2;
  }
  upstream node2{
    server kafka2:9092 weight=1;
  }
}

# nginx实例3

stream {
   #1
  server{
      listen 9092;
      proxy_pass node3;
  }
  upstream node3{
    server kafka3:9092 weight=1;
  }
}

kafka客户端

1. 配置hosts主机名映射

cat /etc/hosts

192.168.200.100  kafka1
192.168.200.101  kafka2
192.168.200.102  kafka3

192.168.200.x 为nginx节点所在ip

2. 访问测试验证

# 查看有哪些topic

kafka-topics.sh --bootstrap-server kafka1:9092 --list

# 创建一个topic

kafka-topics.sh --bootstrap-server kafka1:9092 --create --replication-factor 2 --partitions 30 --topic test-topic

# 生产一条消息

echo '{"key":"value"}' | kafka-console-producer.sh --broker-list kafka1:9092 --topic test-topic

# 消费数据

kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic test-topic --from-beginning --max-messages 3

架构说明

 数据流说明

(1)客户端通过Nginx代理获取到Kafka集群的元数据信息。

(2)元数据信息中包含的是集群的主机名信息,客户端获取到主机名后,在hosts文件中进行解析。例如:kafka1映射为对应的Nginx实例地址:192.168.200.100。这样数据流就会再次Nginx进行交互,相当于对Kafka客户端进行了"欺骗"

总结

 使用Nginx代理Kafka集群,架构并没有较少对外暴露服务的实例数量。架构上主要能实现内外网隔离安全。例如外网Kafka客户端不用开通到内网Kafka集群的直连火墙。只需要将Nginx集群的地址暴露给外网客户端。

架构上可以将Kafka集群中三个节点的监听端口分别配置成不同端口,例如:

advertised.listeners=PLAINTEXT://kafka1:9092
advertised.listeners=PLAINTEXT://kafka2:9093
advertised.listeners=PLAINTEXT://kafka3:9094

在一台节点服务器上运行一个Nginx,然后配置三个不同的server分别监听不同的端口:

stream {
   #1
  server{
      listen 9092;
      proxy_pass node1;
  }
  upstream node1{
    server kafka1:9092 weight=1;
  }
   #2
  server{
      listen 9093;
      proxy_pass node2;
  }
  upstream node2{
    server kafka2:9093 weight=1;
  }
   #3
  server{
      listen 9094;
      proxy_pass node3;
  }
  upstream node3{
    server kafka3:9094 weight=1;
  }
}

在资源上我们只需要部署一台对外暴露的Nginx服务器IP和三个端口即可

192.168.200.100 kafka1
192.168.200.100 kafka2
192.168.200.100 kafka3

这样客户端就避免和Kafka Broker直连,而是通过nginx进行了路由。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/629240.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MQTT_客户端安装_1.4

下载地址 MQTTX 下载 下一步直接安装即可 界面介绍

Lambda 表达式详解

LAMBDA ⚪ λ 希腊字母表中排序第十一位的字母, 英语名称为Lambda ⚪ 避免匿名内部类定义过多 ⚪ 其实质属于函数式编程的概念 ⚪ 也可称为闭包 ⚪ Lambda允许把一个函数作为方法的参数(函数作为参数传递进方法中)。 Lambda是在jdk8之后出现的所以现…

【数据可视化-05】:Plotly数据可视化宝典

一、引言 数据可视化是机器学习流程中不可或缺的一部分。通过图形和图表展示数据,我们可以更直观地理解数据的分布、趋势和关联,从而更有效地进行数据分析、特征工程和模型评估。Plotly是一个功能强大且灵活的数据可视化库,它提供了丰富的图表…

人物介绍模板 PSD 源文件免费获取

免费获取 下载链接在最后! 下载链接在最后! 下载链接在最后! 下载链接在最后! 下载链接在最后! 链接:https://pan.baidu.com/s/1sq3e6djMdZt76Sh_uqVxWg 提取码:naun

相约蓉城 | 全视通邀您参加 CHCC 2024第25届全国医院建设大会

第25届全国医院建设大会暨国际医院建设、装备及管理展览会(CHCC2024),将于5月17日-19日在成都中国西部国际博览城盛大启幕。 全视通将携智慧病房、智慧门诊、智慧手术室、智慧后勤、智慧康养等产品方案亮相11号厅K05展位,期待与您…

Math.Round()函数说明

Math.Round()并不是严格意义上的是四舍五入函数。它默认的执行的是“银行家舍入”算法,即四舍六入五取偶。概括为:四舍六入五考虑、五后非零就进一,五后皆零看奇偶,五前为偶应舍去、五前为奇要进一。 当为5时,取离着最…

TypeScript学习日志-第二十四天(webpack构建ts+vue3)

webpack构建tsvue3 一、构建项目目录 如图: shim.d.ts 这个文件用于让ts识别.vue后缀的 后续会说 并且给 tsconfig.json 增加配置项 "include": ["src/**/*"] 二、基础构建 安装依赖 安装如下依赖: npm install webpack -D …

python实现贪吃蛇游戏,python贪吃蛇

欢迎关注我👆,收藏下次不迷路┗|`O′|┛ 嗷~~ 目录 一.前言 二.代码 三.使用 四.总结 一.前言 贪吃蛇游戏是一款经典的休闲益智类游戏,以下是关于该游戏的详细介绍: 游戏类型与平台:

JAVA云his医院管理系统源码 SaaS模式+融合B/S版电子病历 基于云计算技术开发的云his医院管理系统

JAVA云his医院管理系统源码 SaaS模式融合B/S版电子病历 基于云计算技术开发的云his医院管理系统 定义 美国著名教授Morris.Collen于1988年曾著文为医院信息系统下了如下定义:利用电子计算机和通讯设备,为医院所属各部门提供病人诊疗信息和行政管理信息…

FPGA相关论文阅读

一、Achieving 100Gbps Intrusion Prevention on a Single Server 论文名称中文翻译:在单台服务器上实现100Gbps吞吐量的入侵防御检测。 文章中的Mixed-1和Norm-1 二、Distributed Password Hash Computation on Commodity Heterogeneous Programmable Platforms…

STM32学习-1 新建工程

教学资料来自【STM32入门教程-2023版 细致讲解 中文字幕】 https://www.bilibili.com/video/BV1th411z7sn/?p5&share_sourcecopy_web&vd_sourcec6cfedd1c739ca8502f041514e158616 在keil中,每个代码最后一行必须是空的,不然运行会报错 配置库函…

Failed to start tomcat.service: Unit is not loaded properly: Bad message 如何解决?

错误 “Failed to start tomcat.service: Unit is not loaded properly: Bad message” 通常意味着的 tomcat.service systemd 配置文件存在语法错误或配置不正确。为了解决这个问题,一步步检查和修正这个服务文件。 1. 检查 tomcat.service 文件 首先&#xff0c…

免费泛域名证书申请

通配符证书是一种 SSL/TLS 证书,可用于保护多个域(主机),由域名字段中的通配符 (*) 指示。 如果您有很多需要保护的域或子域,这会很有帮助,因为它可以节省您的时间和金钱。 本文将讨论通配符证书、它们的工…

【文献阅读】李井林等2021ESG促企业绩效的机制研究——基于企业创新的视角

ESG促进企业绩效的机制 摘要 0.引言与文献综述 1.理论分析与研究假设 1.1企业ESG表现与企业绩效 假设1a:企业的环境表现对企业绩效存在正向影响效应。 假设1b:企业的社会表现对企业绩效存在正向影响效应。 假设1c:企业的公司治理表现对企业…

开放式耳机哪款具有高性价比?5款高分开放式耳机倾力推荐

作为多年的耳机发烧友,强烈给你们安利开放式耳机,真的是舒适耐用,性价比高。开放式耳机以其独特的不入耳设计,给用户带来了最舒适的佩戴感受。如果小白还不知道怎么选择高性价比的开放式耳机那就看看我的总结吧!下面就…

Fast-Poisson-Image-Editing代码介绍(二)

目录 2.fpei文件下 2.6 number_solver.py 2.7 process.py 2.8 taichi_solver.py 3. 算法总结 4. 代码运行 4.1 测试 4.2 基于GUI后端自定义框输出编辑图像结果 4.2.1 下载open-cv 4.2.2 输入命令 4.2.3 自定义框 4.2.4 按ESC退出 接续Fast-Poisson-Image-Editing代码…

MYSQL和JAVA中将中文汉字按照拼音首字母排序

一、MYSQL将中文汉字按照拼音首字母排序 数据库使用的字符编码是utf8_general_ci,如下 ORDER BY CONVERT(表名.字段名 USING gbk) COLLATE gbk_chinese_ci ASC;若是表查询,CONVERT中可以不添加表名。 查询结果如下: 二、JAVA中将中文汉字…

【Qt】widget圆角,styleSheet

仅配置widget,不设置其子组件。 #widget{background-color: rgba(255, 255, 255, 100); border-top-left-radius: 20; border-top-right-radius: 20; border-bottom-left-radius: 20; border-bottom-right-radius: 20;}

Vue3.0 一些总结 【持续更新】

1. reactive 只适用于对象 (包括数组和内置类型,如 Map 和 Set,它不支持如 string、number 或 boolean 这样的原始类型) import { reactive } from vueconst counter reactive({count: 0 })console.log(counter.count) // 0 counter.count注意&#xf…

msvcp140_codecvt_ids.dll找不到要如何处理?简单的修复方法分享

在使用Windows操作系统时,用户可能会遇到“无法找到msvcp140_codecvt_ids.dll”这一错误信息。该提示通常发生在启动某些应用程序时,提示失去了关键的动态链接库文件(DLL)依赖。此DLL文件属于Microsoft Visual C Redistributable软…