数据接入
链接参考文档 LVS+Keepalived项目
车辆数据上收,TBox通过TCP协议连接到TSP平台 建立连接后进行数据上传。也可借由该连接实现远程控制等操作。
通过搭建 LV—NGinx—Netty实现高并发数据接入
- LVS:四层负载均衡(位于内核层):根据请求报文中的目标地址和端口进行调度
- NGinx:七层负载均衡(位于应用层):根据请求报文的内容进行调度,这种调度属于「代理」的方式
组件 | 角色 | 主机名称 | 虚拟ip/端口 |
---|---|---|---|
LVS+keepalived | active | -0007 | |
LVS+keepalived | backup | -0006 | |
Nginx | 负载 | -0005 | 8050 |
Nginx | 负载 | -0004 | 8050 |
Netty | 真实服务器 | -0003 | 8050 |
Netty | 真实服务器 | -0002 | 8050 |
Netty | 真实服务器 | -0003 | 8092 |
Netty | 真实服务器 | -0002 | 8092 |
Netty | 真实服务器 | -0001 | 8092 |
Netty | 真实服务器 | -0001 | 8092 |
使用华为云服务器 安装LVS 需要有VPC服务(免费),在控制台页面做虚拟ip绑定
一、安装LVS 服务
使用的是 DR 模式
- NET模式:LVS将数据请求包转发给真实服务器的时候,会修改成真实服务器的IP地址;在回复时真实服务器会把回复包发往LVS调度服务器 再发往客户端。
- TUN隧道模式:将原始数据包封装并添加新的包头(内容包括新的源地址及端口、目标地址及端口),从而实现将一个目标为调度器的VIP地址的数据包封装,通过隧道转发给后端的真实服务器(RealServer)感觉很复杂。
- DR模式:要求LVS调度服务器要和后端服务器在同一局域网下,为后端服务器添加lo回环地址为VIP(虚拟IP地址)这样回复给客户端 客户端会以为是连接的VIP进行回复的
DR模式不支持端口映射
#查看网卡 eth0
ifconfig
#执行 虚拟ip:172.25.94.187 广播地址(不变):172.25.94.191 子网掩码(不变):255.255.255.192 up:立即启用vip(虚拟ip)
ifconfig eth0:1 172.25.94.187 broadcast 172.25.94.191 netmask 255.255.255.192 up
#查看当前网卡信息
ip a
#安装keepalived
sudo yum install keepalived
#启动keepalived
systemctl start keepalived
#加入开机启动keepalived
systemctl enable keepalived
#重新启动keepalived
systemctl restart keepalived
#查看keepalived状态
systemctl status keepalived
LVS 模块内嵌lvs模块,只需要ipvsadm和keepalived安装
#查看Linux 内核版本
uname -r
#查看内核是否集成lvs模块
find /lib/modules/$(uname -r)/ -iname "**.ko*" | cut -d/ -f5-
#安装LVS管理工具:ipvsadm
yum install -y gcc gcc-c++ makepcre pcre-devel kernel-devel openssl-devel libnl-devel popt*
yum -y install ipvsadm
#启动ipvs
sudo ipvsadm
#查看是否支持lvs
sudo lsmod |grep ip_vs
#查看ipvsadm 版本
ipvsadm -v
#服务器添加路由规则
route add -host 172.25.94.187 dev ens33:0
route add -host 172.25.110.124 dev eth:0
#启用系统的包转发功能 #1:启用ip转发,0:禁止ip转发
echo "1" >/proc/sys/net/ipv4/ip_forward
#清除原有转发规则
systemctl restart keepalived
systemctl status ipvsadm
ipvsadm --clear
#添加虚拟ip规则 rr:负载均衡算法 轮询
ipvsadm -A -t 172.25.94.187:8043 -s rr
ipvsadm -a -t 172.25.94.187:8043 -r 172.25.94.151:8043 -g
ipvsadm -a -t 172.25.94.187:8043 -r 172.25.94.152:8043 -g
ipvsadm -l
#配置tcp/tcpfin/udp超时时间
ipvsadm --set 900 120 300
#添加虚机IP规则也可以通过修改文件实现
vim /etc/keepalived/keepalived.conf
global_defs {
router_id chery_21
}
vrrp_instance VI_1 {
state MASTER
interface eth0
virtual_router_id 51
priority 100
advert_int 1
authentication {
auth_type PASS
auth_pass 1111
}
virtual_ipaddress {
172.25.110.124
}
}
virtual_server 172.25.110.124 8050 {
delay_loop 6
lb_algo rr
lb_kind DR
persistence_timeout 50
protocol TCP
real_server 172.25.110.19 8050 {
weight 1
TCP_CHECK {
connect_timeout 30
delay_before_retry 3
}
}
real_server 172.25.110.18 8050 {
weight 2
TCP_CHECK {
connect_timeout 30
}
}
}
全局定义(global_defs)
router_id chery_21
:定义了当前Keepalived实例的路由ID,这是唯一的标识符,用于在VRRP组中区分不同的Keepalived实例。
VRRP实例(vrrp_instance VI_1)
-
state MASTER
:设置当前实例的初始状态为MASTER。在VRRP组中,MASTER负责处理对虚拟IP的流量。 -
interface eth0
:指定VRRP通信使用的网络接口。 -
virtual_router_id 51
:虚拟路由的ID,用于在VRRP组中标识不同的虚拟路由器。 -
priority 100
:设置当前实例的优先级,优先级高的实例将成为MASTER。 -
advert_int 1
:VRRP通告的间隔时间,单位为秒。MASTER每隔这个时间会向其他节点发送VRRP通告。 -
authentication
:VRRP认证配置,确保只有授权的设备可以加入VRRP组。
auth_type PASS
:使用密码认证。auth_pass 1111
:认证密码。
-
virtual_ipaddress
:定义了虚拟IP地址,即VIP,客户端将访问此IP地址来访问服务。
虚拟服务器(virtual_server)
172.25.110.124 8050
:定义了虚拟服务器的IP地址和端口号,这里与VRRP的VIP相同,表明这个虚拟服务器是通过VIP来访问的。delay_loop 6
:健康检查的时间间隔,单位为秒。lb_algo rr
:负载均衡算法,这里使用的是轮询(rr)。lb_kind DR
:负载均衡类型,这里使用的是直接路由(DR),需要确保后端服务器(real_server)配置正确以支持DR模式。persistence_timeout 50
:会话保持时间,单位为秒。在指定时间内,来自同一客户端的请求将被转发到同一台后端服务器。protocol TCP
:使用TCP协议进行健康检查和负载均衡。
后端服务器(real_server)
-
定义了多个后端服务器,每个服务器都配置了IP地址、端口号、权重和健康检查设置。
-
weight
:权重,用于负载均衡时决定服务器的优先级。 -
TCP_CHECK
:TCP健康检查配置。
connect_timeout 30
:连接超时时间,单位为秒。delay_before_retry 3
:在重试之前等待的时间,单位为秒。
LVS负载均衡(LVS简介、三种工作模式、十种调度算法)
#列出当前LVS表中的所有配置,包括虚拟服务器和真实服务器的信息。
ipvsadm -Ln
#显示统计信息,包括已转发的连接数、入包个数、出包个数等。
ipvsadm -L --stats
#显示转发速率信息,包括每秒连接数、每秒入包个数、每秒出包个数等
ipvsadm -L --rate
#keepalived 日志
vim /var/log/message
二、安装nginx 服务
在nginx服务器和后端服务器 配置lo回环地址 否则回复将不成功
服务器上一般还需要修改lo网卡 配置成虚拟IP。华为云服务器使用的是Centos 8版本 没有 lo配置文件,通过 ifconfig lo:0 172.25.94.187 netmask 255.255.255.255 broadcast 172.25.94.187 up 华为云服务器不支持修改网卡,所以修改了 eth0网卡配置 ip addr add 172.25.94.187/24 dev eth0
yum -y nginx
#检查是否有 stream
nginx -V 2>&1 | grep --color -o with-stream
#如果没有stream需要对nginx源码安装进行二次编译
tar -zxvf nginx-*.tar.gz
cd nginx-*
./configure --prefix=/usr/local/nginx --with-http_ssl_module --with-stream
make
sudo make install
#重新加载Nginx配置文件
nginx -s reload
#强制停止Nginx服务
nginx -s stop
#重启nginx
nginx -s reopen
#修改配置文件
vim /etc/nginx/nginx.conf
netstat -anput | grep nginx
nginx -c nginx.conf
三、Netty服务
**1、在Linux上部署启用了 epoll **
epoll:是Linux内核为处理大批量文件描述符而作的改进的poll,是Linux下多路复用IO接口select/poll的增强版本
应用于Linux系统下的应用程序,特别是需要处理大量并发连接的高性能网络服务器。
BIO:同步阻塞IO,也就是传统阻塞型的IO,服务器实现模式是一个连接对应一个线程。客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个链接不做任何事情会造成不必要的线程开销。
NIO:同步非阻塞IO,服务器实现模式是一个线程处理多个请求,客户端发送的链接请求都会注册到多路复用器上,多路复用器轮询到链接有IO请求就进行处理。
AIO:异步非阻塞,AIO引入了异步通道的概念,采用了Proactor模式,简化了程序编写,有效的请求才启动线程,他的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且链接时间较长的应用。
public class NettyServer {
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
@Resource
private NettyServerInitializer nettyServerInitializer;
ServerBootstrap serverBootstrap = new ServerBootstrap();
EventLoopGroup boss = null;
EventLoopGroup worker = null;
ChannelFuture future = null;
ChannelFuture future2 = null;
//厂商编码
Integer factoryCode = null;
@Value("${netty.server.use-epoll}")
boolean epoll = false;
@Value("${netty.server.port1}")
private int port = 8030;
@Value("${netty.server.port2}")
private int port2 = 8050;
@PreDestroy
public void stop() {
if (future != null) {
future.channel().close().addListener(ChannelFutureListener.CLOSE);
future.awaitUninterruptibly();
boss.shutdownGracefully();
worker.shutdownGracefully();
future = null;
logger.info(" 服务关闭 ");
}
}
public void start() {
logger.info(" nettyServer 正在启动");
if (epoll) {
logger.info(" nettyServer 使用epoll模式");
boss = new EpollEventLoopGroup(4);
//指定线程32
worker = new EpollEventLoopGroup(32);
} else {
logger.info(" nettyServer 使用nio模式");
boss = new NioEventLoopGroup(4);
worker = new NioEventLoopGroup(32);
}
logger.info("netty服务器在[" + this.port + "]端口启动监听");
logger.info("netty服务器在[" + this.port2 + "]端口启动监听");
serverBootstrap.group(boss, worker)
// tcp缓冲区:将不能处理的客户端连接请求放到队列里等待
.option(ChannelOption.SO_BACKLOG, 10240)
//多个进程或者线程绑定到同一端口,提高服务器程序的性能
.option(EpollChannelOption.SO_REUSEPORT, true)
//打印info级别的日志
// .handler(new LoggingHandler(LogLevel.INFO))
// 将网络数据积累到一定的数量后,服务器端才发送出去,会造成一定的延迟。希望服务是低延迟的,建议将TCP_NODELAY设置为true
.childOption(ChannelOption.TCP_NODELAY, true)
// 可以确保连接在因网络问题中断时能够被及时检测并处理。
.childOption(ChannelOption.SO_KEEPALIVE, false)
// 配置ByteBuf内存分配器
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
// 配置 编码器、解码器、业务处理
.childHandler(nettyServerInitializer);
if (epoll) {
serverBootstrap.channel(EpollServerSocketChannel.class);
} else {
serverBootstrap.channel(NioServerSocketChannel.class);
}
try {
future = serverBootstrap.bind(port).sync();
future2 = serverBootstrap.bind(port2).sync();
future.channel().closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception { //通过回调只关闭自己监听的channel
future.channel().close();
}
});
future2.channel().closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
future.channel().close();
}
});
// 等待服务端监听端口关闭
// future.channel().closeFuture().sync();
} catch (Exception e) {
logger.info("nettyServer 启动时发生异常---------------{}", e);
logger.info(e.getMessage());
} finally {
//这里一定要注释,因为上面没有阻塞了,不注释的话,这里会直接关闭的
//boss.shutdownGracefully();
//worker.shutdownGracefully();
}
}
2、超时配置
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// readerIdleTimeSeconds 读超时;writerIdleTimeSeconds 写超时;allIdaleTimes 读写全超时 300 秒;断开连接
pipeline.addLast(new IdleStateHandler(0, 0, 300, TimeUnit.SECONDS));
pipeline.addLast(new LengthFieldBasedFrameDecoder(64 * 1024, 22, 2, 1, 0));
//根据端口动态的选择解码器
Integer localPort = socketChannel.localAddress().getPort();
if (localPort == 8050 || localPort == 8055) {
pipeline.addLast("authHandler", authHandler);
pipeline.addLast("messageHandler", messageHandler);
} else if (localPort == 8030 || localPort == 8035) {
pipeline.addLast("authHandler", authHandler2);
pipeline.addLast("messageHandler", messageHandler2);
}
}
在处理器中的应用
/**
* 用户事件触发,发现读超时会调用 根据心跳检测状态去关闭连接
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
String clientId = ChannelStore.getClientId(ctx);
Attribute<Integer> timesAttr = ctx.channel().attr(AttributeKey.valueOf("times"));
Integer timeInt = timesAttr.get();
if (timeInt == null) {
timeInt = 0;
}
String eventDesc = null;
switch (event.state()) {
case READER_IDLE:
eventDesc = "读空闲";
break;
case WRITER_IDLE:
eventDesc = "写空闲";
break;
case ALL_IDLE:
eventDesc = "读写空闲";
break;
}
//获取ip地址信息
InetAddress ip = InetAddress.getLocalHost();
String hostAddress = ip.getHostAddress();
log.info(clientId + " 地址:" + hostAddress + "发生超时事件--" + eventDesc);
timeInt++;
timesAttr.set(timeInt);
if (timeInt > 1) {
//删除ip地址信息
String redisIpAddress = redisTemplateNew.get(clientId + "_IP");
boolean hostBoolean = hostAddress.equals(redisIpAddress);
log.info(hostBoolean + "check :" + clientId + " redisTemplateNewDelete:" + hostAddress + "redisIP:" + redisIpAddress);
if (redisIpAddress != null && hostBoolean) {
redisTemplateNew.delete(clientId + "_IP");
}
log.info(clientId + " 地址:" + hostAddress + ":" + ctx.channel().remoteAddress() + "空闲次数为" + timeInt + "次 关闭连接 " + clientId);
ctx.channel().close();
}
}
}
3、下行API
public class SendApi {
@Resource
private RedisTemplateNew redisTemplateNew;
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping(value = "/userinfo")
public UserDto gerUserInfo() {
UserDto user = new UserDto();
user.setUserId("888888");
user.setUserName("holmium");
user.setSex("1");
return user;
}
}
}
}
}