基于Netty实现TCP通信

创建一个Maven项目添加下面依赖

    <dependencies>
        <!-- 日志依赖 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.32</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-mapper-asl</artifactId>
            <version>1.9.13</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.78</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.84.Final</version>
        </dependency>
    </dependencies>

编码解码器

package com.example.nettydemo.coder;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

import java.nio.charset.StandardCharsets;


public class NettyEncoder extends MessageToByteEncoder<String> {
    public NettyEncoder() {
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
        byte[] byteMsg = msg.getBytes(StandardCharsets.UTF_8);
        int msgLength = byteMsg.length;
        ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(4 + byteMsg.length);
        buf.writeInt(msgLength);
        buf.writeBytes(byteMsg, 0, msgLength);
        out.writeBytes(buf);
        buf.release();
    }
}

package com.example.nettydemo.coder;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;
import java.util.List;


@Slf4j
public class NettyDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        int beginReader = in.readerIndex();
        int dataLength = in.readInt();
        if (in.readableBytes() < dataLength) {
            in.readerIndex(beginReader);
        } else {
            byte[] data = new byte[dataLength];
            in.readBytes(data);
            String str = new String(data, 0, dataLength, StandardCharsets.UTF_8);
            out.add(str);
        }
    }
}

服务端

package com.example.nettydemo.server;

import com.example.nettydemo.coder.NettyDecoder;
import com.example.nettydemo.coder.NettyEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.IOException;
import java.util.Map;


@Slf4j
public class TcpServer {
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private ServerBootstrap server;
    private ChannelFuture channelFuture;
    private Integer port;

    public TcpServer(Integer port) {
        this.port = port;

        // nio连接处理池
        this.bossGroup = new NioEventLoopGroup();
        // 处理事件池
        this.workerGroup = new NioEventLoopGroup();
        server = new ServerBootstrap();
        server.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // 自定义处理类
                        ch.pipeline().addLast(new NettyDecoder());
                        ch.pipeline().addLast(new NettyEncoder());
                        ch.pipeline().addLast(new TcpServerHandler());
                    }
                });
        server.option(ChannelOption.SO_BACKLOG, 128);
        server.childOption(ChannelOption.SO_KEEPALIVE, true);
    }

    public synchronized void startListen() {
        try {
            // 绑定到指定端口
            channelFuture = server.bind(port).sync();
            log.info("netty服务器在[{}]端口启动监听", port);
        } catch (Exception e) {
            log.error("netty服务器在[{}]端口启动监听失败", port);
            e.printStackTrace();
        }
    }

    public void sendMessageToClient(String clientIp, Object msg) {
        Map<String, Channel> channelMap = TcpServerHandler.channelSkipMap.get(port);
        Channel channel = channelMap.get(clientIp);
        String sendStr;
        try {
            sendStr = OBJECT_MAPPER.writeValueAsString(msg);
        } catch (JsonGenerationException e) {
            throw new RuntimeException(e);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        try {
            log.info("向客户端 {} 发送消息内容:{}", clientIp, sendStr);
            channel.writeAndFlush(sendStr);
        } catch (Exception var4) {
            log.error("向客户端 {} 发送消息失败,消息内容:{}", clientIp, sendStr);
            throw new RuntimeException(var4);
        }
    }

    public void pushMessageToClients(Object msg) {
        Map<String, Channel> channelMap = TcpServerHandler.channelSkipMap.get(port);
        if (channelMap != null && !channelMap.isEmpty()) {
            channelMap.forEach((k, v) -> sendMessageToClient(k, msg));
        }
    }
}

package com.example.nettydemo.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;


@Slf4j
public class TcpServerHandler extends SimpleChannelInboundHandler<String> {

    /**
     * 用跳表存储连接channel
     */
    public static Map<Integer, Map<String, Channel>> channelSkipMap = new ConcurrentSkipListMap<>();

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("应用程序的监听通道异常!");
        cause.printStackTrace();
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        // 获取每个用户端连接的ip
        InetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress();
        String clientIp = ipSocket.getAddress().getHostAddress();
        InetSocketAddress localSocket = (InetSocketAddress) channel.localAddress();
        // 本地端口做键
        int localPort = localSocket.getPort();
        Map<String, Channel> channelMap = channelSkipMap.get(localPort);
        if (channelMap == null || channelMap.isEmpty()) {
            channelMap = new HashMap<>(4);
        }
        channelMap.put(clientIp, channel);
        channelSkipMap.put(localPort, channelMap);
        log.info("应用程序添加监听通道,与客户端:{} 建立连接成功!", clientIp);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // 获取每个用户端连接的ip
        Channel channel = ctx.channel();
        InetSocketAddress localSocket = (InetSocketAddress) channel.localAddress();
        int localPort = localSocket.getPort();
        InetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress();
        String clientIp = ipSocket.getAddress().getHostAddress();
        Map<String, Channel> channelMap = channelSkipMap.get(localPort);
        channelMap.remove(clientIp);
        log.info("应用程序移除监听通道,与客户端:{} 断开连接!", clientIp);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {
        Channel channel = channelHandlerContext.channel();
        // 获取每个用户端连接的ip
        InetSocketAddress ipSocket = (InetSocketAddress) channel.remoteAddress();
        log.info("接收到客户端: {} 应用数据:{}", ipSocket, msg);
    }
}

package com.example.nettydemo.server;


public class ServerTest {
    public static void main(String[] args) {
        TcpServer tcpServer = new TcpServer(40001);
        tcpServer.startListen();
        while (true) {
            try {
                // 每5秒向客户端发送一次 "test-朱上林123"
                Thread.sleep(5000);
                tcpServer.pushMessageToClients("test-朱上林123");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

客户端

package com.example.nettydemo.client;

import com.example.nettydemo.coder.NettyDecoder;
import com.example.nettydemo.coder.NettyEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.IOException;


@Slf4j
public class TcpClient {
    private EventLoopGroup group;
    private ChannelFuture channelFuture;
    private final String ip;
    private final Integer port;
    private final ObjectMapper objectMapper = new ObjectMapper();

    public TcpClient(String ip, Integer port) {
        this.ip = ip;
        this.port = port;
    }

    /**
     * 建立连接
     *
     */
    public synchronized void connectServer() {
        log.info("开始建立连接,ip:{}, port:{}", ip, port);
        // 生命nio连接池
        this.group = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap();
            // 配置解码器以及消息处理类
            b.group(this.group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new NettyEncoder());
                            pipeline.addLast(new NettyDecoder());
                            pipeline.addLast(new TcpClientHandler());
                        }
                    });

            // 开始连接
            this.channelFuture = b.connect(ip, port).sync();
        } catch (Exception var4) {
            log.error("连接建立失败,ip:{}, port:{}", ip, port);
            this.group.shutdownGracefully();
            var4.printStackTrace();
        }
    }

    /**
     * 关闭连接
     */
    public void close() {
        this.group.shutdownGracefully();
    }

    /**
     * 发送消息
     *
     * @param msg
     */
    public synchronized void sendCommonMsg(Object msg) {
        String sendStr;
        if (!getConnectStatus()) {
            connectServer();
        }
        try {
            sendStr = objectMapper.writeValueAsString(msg);
        } catch (JsonMappingException e) {
            throw new RuntimeException(e);
        } catch (JsonGenerationException e) {
            throw new RuntimeException(e);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        try {
            log.info("发送消息内容:{}", sendStr);
            this.channelFuture.channel().writeAndFlush(sendStr);
        } catch (Exception var4) {
            log.error("发送消息失败,消息内容:{}", sendStr);
            throw new RuntimeException(var4);
        }
    }

    /**
     * 获取当前连接状态
     */
    public Boolean getConnectStatus() {
        return group != null && !group.isShutdown() && !group.isShuttingDown();
    }
}

package com.example.nettydemo.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;


@Slf4j
public class TcpClientHandler extends SimpleChannelInboundHandler<String> {
    /**
     * 读取事件
     *
     * @param channelHandlerContext
     * @param msg
     */
    @Override
    public void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) {
        log.info("服务返回消息 :{}", msg);
    }

    /**
     * 发生异常
     *
     * @param channelHandlerContext
     * @param throwable
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) {
        log.error("通信发生异常:" + throwable.getMessage());
        channelHandlerContext.close();
    }
}

package com.example.nettydemo.client;


public class TcpClientTest {
    public static void main(String[] args) {
        TcpClient tcpClient = new TcpClient("127.0.0.1", 40001);
        // 客户端连接到服务器后,向服务器发送一条消息:
        tcpClient.connectServer();
        tcpClient.sendCommonMsg("我是Client,刚刚是我连接到你的!");
    }
}

启动服务端和客户端实现通信

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
下课!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/204461.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

帮亲戚个忙,闲来有事用php写个58商铺出租转让信息抓取

最近亲戚想做点小超市生意&#xff0c;但是又不懂互联网&#xff0c;信息获取有点闭塞。知道我身在互联网大潮中&#xff0c;想让我帮忙看看网上有没有商铺转让的。心想&#xff0c;这不是小菜一碟&#xff0c;大显身手的时候来了&#xff0c;大概去58瞅了瞅&#xff0c;这玩意…

【问题定位】阅读Nacos服务注册与发现的源码解决服务注册异常

问题现象 本地服务启动&#xff0c;发现调用FeignClient的服务&#xff0c;跑的是sit的服务&#xff0c;而本地是uat的环境配置。 问题跟踪 feign.SynchronousMethodHandler#invoke&#xff0c;调用远程服务。 public Object invoke(Object[] argv) throws Throwable {Reque…

信号类型(通信)——高斯最小频率键控(GMSK)

系列文章目录 《信号类型&#xff08;通信&#xff09;——仿真》 《信号类型&#xff08;通信&#xff09;——QAM调制信号》 《信号类型&#xff08;通信&#xff09;——QPSK、OQPSK、IJF_OQPSK调制信号》 《信号类型&#xff08;通信&#xff09;——最小频移键控&…

【秒懂JDK,JRE,JVM的关系】

&#x1f320;作者&#xff1a;TheMythWS. &#x1f387;座右铭&#xff1a;不走心的努力都是在敷衍自己&#xff0c;让自己所做的选择&#xff0c;熠熠发光。 ​ JDK与JRE与JVM的关系 先用一张图来直观感受JDK JRE JVM之间的关系&#xff1a; JDK与JRE的关系 先说JDK和JRE…

卷轴模式:金融领域的新趋势

卷轴模式在金融领域逐渐崭露头角&#xff0c;成为一种新型的投资策略。这种模式基于完成任务或达成特定目标来获取积分&#xff0c;利用这些积分进行投资或获取现实物品。它不同于传统的资金盘&#xff0c;而是以一种更稳健的方式运作&#xff0c;避免了资金盘的风险。 一、卷轴…

替代升级虚拟化 | ZStack Cloud云平台助力中节能镇江公司核心业务上云

数字经济正加速推动各行各业的高质量升级发展&#xff0c;云计算是数字经济的核心底层基础设施。作为云基础软件企业&#xff0c;云轴科技ZStack 坚持自主创新&#xff0c;自研架构&#xff0c;产品矩阵可全面覆盖数据中心云基础设施&#xff0c;针对虚拟化资源实现纳管、替代和…

springboot自定义校验注解的实现

自定义校验注解的实现 通过谷粒商城项目学习了自定义校验器的实现一、编写自定义校验注解二、自定义注解的校验器三、关联自定义的校验器和自定义的校验注解总结 通过谷粒商城项目学习了自定义校验器的实现 近日在学习雷神的谷粒商城项目&#xff0c;其中有一个自定义校验的实…

网络字节序

字节序的概念和示例 CPU向内存保存数据的方式有2种&#xff0c;所以CPU解析数据的方式也分为2种。CPU保存和解析数据的方式叫字节序&#xff0c;分为小端字节序和大端字节序。 大端字节序&#xff1a;高位字节存放到低位地址。 小端字节序&#xff1a;高位字节存放到高位地址。…

2023年第三届中国高校大数据挑战赛思路及代码

比赛时间&#xff1a;2023.12.28 08:00 至 2023.12.31 20:00 赛题方向介绍 1、大数据统计分析方向 涉及内容包含&#xff1a;数据的清洗、数据的预测、数据之间的关联分析、综合评价、分类与判别等 2、文本或图象分析方向 涉及内容包含&#xff1a;计算机视觉基础、特征匹配…

基于YOLOv8深度学习的火焰烟雾检测系统【python源码+Pyqt5界面+数据集+训练代码】目标检测、深度学习实战

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

[架构之路-253]:目标系统 - 设计方法 - 软件工程 - 软件设计 - 结构化设计的主要评估指标:高内聚(模块内部)、低耦合(模块之间)的含义

目录 前言&#xff1a; 一、软件工程中的软件设计种类&#xff1a;根据宏观到微观分 &#xff08;1&#xff09;软件架构设计&#xff08;层次划分、模块划分、职责分工&#xff09;&#xff1a; &#xff08;2&#xff09;软件高层设计、概要设计&#xff08;功能模块的接…

样品实验K-KAT348羧酸铋催化剂TDS说明书

样品实验K-KAT348羧酸铋催化剂TDS说明书 50克 100克 200克

【图像分类】基于深度学习的中草药分类系统的设计与实现(ResNet网络,附代码和数据集)

写在前面: 首先感谢兄弟们的关注和订阅,让我有创作的动力,在创作过程我会尽最大能力,保证作品的质量,如果有问题,可以私信我,让我们携手共进,共创辉煌。(专栏订阅用户订阅专栏后免费提供数据集和源码一份,超级VIP用户不在服务范围之内,不想订阅专栏的兄弟们可以私信…

【攻防世界-misc】来自银河的信号

1.下载并打开文件&#xff0c;是个音频软件 2.由于打开音频出现的声音类似于无线波&#xff0c;因此需要用RX-SSTV工具打开&#xff0c; RX-SSTV代表“接收图像慢扫描电视”的意思。慢扫描电视是一种通过无线电进行图像传输的技术&#xff0c;通常用于业余无线电领域。RX-SST…

数电实验-----触发器的原理与应用(Quartus II )

目录 触发器概述 1.基本RS触发器 2.同步触发器 &#xff08;1&#xff09;RS同步触发器 &#xff08;2&#xff09;D触发器 3.边沿触发器 &#xff08;1&#xff09;JK触发器 &#xff08;2&#xff09;T触发器 JK触发器的转换 &#xff08;1&#xff09;JK触发器转换…

消除笔怎么用?手把手教你一键智能消除杂物

消除笔怎么用&#xff1f;消除笔是一种非常实用的工具&#xff0c;可以帮助我们快速修复图片中的小问题。无论是想要消除照片中的路人还是进行一些修改&#xff0c;消除笔都可以轻松地帮助我们实现。 以下是使用消除笔的步骤&#xff1a; 1、打开水印云软件&#xff0c;并在工具…

检索增强生成架构详解【RAG】

生成式AI技术很强大&#xff0c;但它们受到知识的限制。 虽然像 ChatGPT 这样的LLM可以执行许多任务&#xff0c;但每个LLM的基线知识都存在基于其训练数据的差距。 如果你要求LLM写一些关于最近趋势或事件的文章&#xff0c;LLM不会知道你在说什么&#xff0c;而且回答最好是混…

Gateway(拦截器/路由)入门

目录 1、概述2、实现3、网关模块3.1 AbstractGatewayFilterFactory类3.2 AbstractGatewayFilterFactory和 GlobalFilter区别 4、服务模块5、服务之间请求传递请求头6、 代码结构优化 1、概述 微服务框架中网关提供统一的路由方式&#xff0c;并且基于 Filter 链的方式提供了网…

阿里云效一键部署前后端

静态站点到OSS 阿里云-云效&#xff0c;阿里云企业级一站式 DevOps&#xff0c;可以免费使用&#xff08;会限制人数、流水线数量等&#xff0c;个人项目够用了&#xff09;。相关文章 CI 持续集成 - 阿里云云效 OSS 是对象存储的意思&#xff0c;一般一个项目对应一个 Bucke…

YOLOv8优化策略:检测头结构全新创新篇 | RT-DETR检测头助力,即插即用

🚀🚀🚀本文改进:RT-DETR检测头助力YOLOv8检测,保持v8轻量级的同时提升检测精度 🚀🚀🚀YOLOv8改进专栏:http://t.csdnimg.cn/hGhVK 学姐带你学习YOLOv8,从入门到创新,轻轻松松搞定科研; 1.RT-DETR介绍 论文: https://arxiv.org/pdf/2304.08069.pdf 摘要:…