Socket网络编程(五)——TCP数据发送与接收并行

目录

  • 主要实现需求
  • TCP 服务端收发并行重构
    • 启动main方法重构
    • 重构分离收发消息的操作
    • 重构接收消息的操作
    • 重构发送消息
    • TCPServer调用发送消息的逻辑
    • 监听客户端链接逻辑重构
    • Socket、流的退出与关闭
  • TCP 客户端收发并行重构
    • 客户端 main函数重构
    • 客户端接收消息重构
    • 客户端发送消息重构
    • 客户端 linkWith 主方法重构
  • TCP 收发并行重构测试
    • 服务端重构后执行日志
    • 客户端重构后执行日志
  • 源码下载

主要实现需求

多线程收发并行
TCP多线程收发协作
TCP 服务端收发并行重构

TCP 服务端收发并行重构

启动main方法重构

原有的main逻辑如下:
20240229-034932-Jk.png

重构后如下:

public class Server {
 
    public static void main(String[] args) throws IOException {
 
        TCPServer tcpServer = new TCPServer(TCPConstants.PORT_SERVER);
        boolean isSucceed = tcpServer.start();
        if(!isSucceed){
            System.out.println("Start TCP server failed.");
        }
        UDPProvider.start(TCPConstants.PORT_SERVER);
 
        // 键盘输入:
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
        String str;
        do {
            str = bufferedReader.readLine();
            tcpServer.broadcast(str);
        } while (!"00bye00".equalsIgnoreCase(str));
 
        UDPProvider.stop();
        tcpServer.stop();
    }
}

重构后,从while循环不断读取键盘输入信息,当输入“00bye00” 时退出读取。此处只读取键盘输入数据,客户端发送的数据在会重新拆分出来新的线程单独处理。

重构分离收发消息的操作

创建 ClientHandler.java 重构收发消息操作:

public class ClientHandler {
    private final Socket socket;
    private final ClientReadHandler readHandler;
    private final ClientWriteHandler writeHandler;
    private final CloseNotiry closeNotiry;
 
    public ClientHandler(Socket socket, CloseNotiry closeNotiry ) throws IOException {
        this.socket = socket;
        this.readHandler = new ClientReadHandler(socket.getInputStream());
        this.writeHandler = new ClientWriteHandler(socket.getOutputStream());
        this.closeNotiry = closeNotiry;
        System.out.println("新客户链接: " + socket.getInetAddress() + "\tP:" + socket.getPort());
    } 
}

重构接收消息的操作

    /**
     * 接收数据
     */
    class ClientReadHandler extends Thread {
 
        private boolean done = false;
        private final InputStream inputStream;
 
        ClientReadHandler(InputStream inputStream){
            this.inputStream = inputStream;
        }
        @Override
        public void run(){
            super.run();
            try {
                // 得到输入流,用于接收数据
                BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));
                do {
                    // 客户端拿到一条数据
                    String str = socketInput.readLine();
                    if(str == null){
                        System.out.println("客户端已无法读取数据!");
                        // 退出当前客户端
                        ClientHandler.this.exitBySelf();
                        break;
                    }
                    // 打印到屏幕
                    System.out.println(str);
                }while (!done);
                socketInput.close();
            }catch (IOException e){
                if(!done){
                    System.out.println("连接异常断开");
                    ClientHandler.this.exitBySelf();
                }
            }finally {
                // 连接关闭
                CloseUtils.close(inputStream);
            }
        }
        void exit(){
            done = true;
            CloseUtils.close(inputStream);
        }
    }

创建一个单独的线程进行接收消息,该线程不需要关闭。

重构发送消息

    /**
     * 发送数据
     */
    class ClientWriteHandler {
        private boolean done = false;
        private final PrintStream printStream;
        private final ExecutorService executorService;
 
        ClientWriteHandler(OutputStream outputStream) {
            this.printStream = new PrintStream(outputStream);
            // 发送消息使用线程池来实现
            this.executorService = Executors.newSingleThreadExecutor();
        }
 
        void exit(){
            done = true;
            CloseUtils.close(printStream);
            executorService.shutdown();
        }
 
        void send(String str) {
            executorService.execute(new WriteRunnable(str));
        }
 
        class WriteRunnable implements  Runnable{
            private final String msg;
 
            WriteRunnable(String msg){
                this.msg = msg;
            }
            @Override
            public void run(){
                if(ClientWriteHandler.this.done){
                    return;
                }
                try {
                    ClientWriteHandler.this.printStream.println(msg);
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }
    }

TCPServer调用发送消息的逻辑

    public void broadcast(String str) {
        for (ClientHandler client : clientHandlerList){
            // 发送消息
            client.send(str);
        }
    }

监听客户端链接逻辑重构

    private List<ClientHandler> clientHandlerList = new ArrayList<>();
 
    /**
     * 监听客户端链接
     */
    private class ClientListener extends Thread {
        private ServerSocket server;
        private boolean done = false;
 
        private ClientListener(int port) throws IOException {
            server = new ServerSocket(port);
            System.out.println("服务器信息: " + server.getInetAddress() + "\tP:" + server.getLocalPort());
        }
 
        @Override
        public void run(){
            super.run();
 
            System.out.println("服务器准备就绪~");
            // 等待客户端连接
            do{
                // 得到客户端
                Socket client;
                try {
                    client = server.accept();
                }catch (Exception e){
                    continue;
                }
                try {
                    // 客户端构建异步线程
                    ClientHandler  clientHandler = new ClientHandler(client, handler -> clientHandlerList.remove(handler));
                    // 启动线程
                    clientHandler.readToPrint();
                    clientHandlerList.add(clientHandler);
                } catch (IOException e) {
                    e.printStackTrace();
                    System.out.println("客户端连接异常: " + e.getMessage());
                }
 
            }while (!done);
            System.out.println("服务器已关闭!");
        }
        void exit(){
            done = true;
            try {
                server.close();
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    }

clientHandlerList作为已经建立了连接的客户端的集合,用于管理当前用户的信息。接收与发送都使用该集合。

Socket、流的退出与关闭

    /**
     * 退出、关闭流
     */
    public void exit(){
        readHandler.exit();
        writeHandler.exit();
        CloseUtils.close(socket);
        System.out.println("客户端已退出:" + socket.getInetAddress() + "\tP:" + socket.getPort());
    }
 
    /**
     * 发送消息
     * @param str
     */
    public void send(String str){
        writeHandler.send(str);
    }
 
    /**
     * 接收消息
     */
    public void readToPrint() {
        readHandler.exit();
    }
 
    /**
     *  接收、发送消息异常,自动关闭
     */
    private void exitBySelf() {
        exit();
        closeNotiry.onSelfClosed(this);
    }
    /**
     *  关闭流
     */
    public interface CloseNotiry{
        void onSelfClosed(ClientHandler handler);
    }

TCP 客户端收发并行重构

客户端 main函数重构

    public static void main(String[] args) {
        // 定义10秒的搜索时间,如果超过10秒未搜索到,就认为服务器端没有开机
        ServerInfo info = UDPSearcher.searchServer(10000);
        System.out.println("Server:" + info);
 
        if( info != null){
            try {
                TCPClient.linkWith(info);
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    }

客户端接收消息重构

    static class ReadHandler extends Thread{
        private boolean done = false;
        private final InputStream inputStream;
 
        ReadHandler(InputStream inputStream){
            this.inputStream = inputStream;
        }
        @Override
        public void run(){
            try {
                // 得到输入流,用于接收数据
                BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));
                do {
                    // 客户端拿到一条数据
                    String str = null;
                    try {
                        str = socketInput.readLine();
                    }catch (SocketTimeoutException e){
 
                    }
                    if(str == null){
                        System.out.println("连接已关闭,无法读取数据!");
                        break;
                    }
                    // 打印到屏幕
                    System.out.println(str);
                }while (!done);
                socketInput.close();
            }catch (IOException e){
                if(!done){
                    System.out.println("连接异常断开:" + e.getMessage());
                }
            }finally {
                // 连接关闭
                CloseUtils.close(inputStream);
            }
        }
        void exit(){
            done = true;
            CloseUtils.close(inputStream);
        }
    }

创建ReadHandler用单独的线程去接收服务端的消息。连接关闭则exit() 关闭客户端。

客户端发送消息重构

    private static void write(Socket client) throws IOException {
        // 构建键盘输入流
        InputStream in = System.in;
        BufferedReader input = new BufferedReader(new InputStreamReader(in));
 
        // 得到Socket输出流,并转换为打印流
        OutputStream outputStream = client.getOutputStream();
        PrintStream socketPrintStream = new PrintStream(outputStream);
 
        boolean flag = true;
        do {
            // 键盘读取一行
            String str = input.readLine();
            // 发送到服务器
            socketPrintStream.println(str);
 
            // 从服务器读取一行
            if("00bye00".equalsIgnoreCase(str)){
                break;
            }
        }while(flag);
        // 资源释放
        socketPrintStream.close();
    }

在linkWith() 中调用write() 发送方法,由 do-while 循环读取本地键盘输入信息进行发送操作。当满足 “00bye00” 时,关闭循环,关闭socket连接,结束该线程。

客户端 linkWith 主方法重构

     public static void linkWith(ServerInfo info) throws IOException {
        Socket socket = new Socket();
        // 超时时间
        socket.setSoTimeout(3000);
        // 端口2000;超时时间300ms
        socket.connect(new InetSocketAddress(Inet4Address.getByName(info.getAddress()),info.getPort()));//
 
        System.out.println("已发起服务器连接,并进入后续流程~");
        System.out.println("客户端信息: " + socket.getLocalAddress() + "\tP:" + socket.getLocalPort());
        System.out.println("服务器信息:" + socket.getInetAddress() + "\tP:" + socket.getPort());
 
        try {
            ReadHandler readHandler = new ReadHandler(socket.getInputStream());
            readHandler.start();
            // 发送接收数据
            write(socket);
        }catch (Exception e){
            System.out.println("异常关闭");
        }
 
        // 释放资源
        socket.close();
        System.out.println("客户端已退出~");
    }

原有的逻辑里,是调用 todo() 方法,在todo() 方法里同时进行收发操作。现在是进行读写分离。

TCP 收发并行重构测试

服务端重构后执行日志

20240229-053719-hC.png

客户端重构后执行日志

20240229-053740-Qt.png

源码下载

下载地址:https://gitee.com/qkongtao/socket_study/tree/master/src/main/java/cn/kt/socket/SocketDemo_L5_TCP_Channel

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/419832.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python程序的流程

归纳编程学习的感悟&#xff0c; 记录奋斗路上的点滴&#xff0c; 希望能帮到一样刻苦的你&#xff01; 如有不足欢迎指正&#xff01; 共同学习交流&#xff01; &#x1f30e;欢迎各位→点赞 &#x1f44d; 收藏⭐ 留言​&#x1f4dd; 年轻是我们唯一拥有权利去编制梦想的时…

【Java程序设计】【C00324】基于Springboot的高校疫情防控管理系统(有论文)

基于Springboot的高校疫情防控管理系统&#xff08;有论文&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于Springboot的高校疫情防控系统&#xff0c;本系统有管理员、学校管理员、学院管理员、教师以及学生五种角色&#xff1b; 管理员&#x…

智慧灌区项目案例(甘肃省兰州市某重点灌区)

​甘肃省兰州市某重点灌区自上个世纪80年代建成后,灌溉面积达到30万亩,对推动当地农业发展发挥了重要作用。但长期以来,该灌区的水利管理仍主要依靠人工统计记录,缺乏实时监测和精细化管理。为实现灌区管理的现代化升级,甘肃水利局委托星创易联公司设计实施水利信息化项目。 项…

CSAPP-信息的表示和处理

文章目录 概念扫盲思想理解经典好图安全事件 概念扫盲 1.大端高位在前&#xff0c;小端低位在前 2.逻辑运算符&#xff08;&& 、||、&#xff01;&#xff09;与位级运算&#xff08;&、|、~&#xff09;的差异 3.宏可以保证无论代码如何编译&#xff0c;都能生成…

DSP软件架构

&#x1f3ac;个人简介&#xff1a;一个全栈工程师的升级之路&#xff01; &#x1f4cb;个人专栏&#xff1a;计算机杂记 &#x1f380;CSDN主页 发狂的小花 &#x1f304;人生秘诀&#xff1a;学习的本质就是极致重复! 目录 一 数字信号处理基本运算 二 DSP软件架构 1 哈…

谷歌最强开源大模型亮相!Gemini技术下放,笔记本就能跑,可商用

明敏 发自 凹非寺 量子位 | 公众号 QbitAI 谷歌大模型&#xff0c;开源了&#xff01; 一夜之间&#xff0c;Gemma系列正式上线&#xff0c;全面对外开放。 它采用Gemini同款技术架构&#xff0c;主打开源和轻量级&#xff0c;免费可用、模型权重开源、允许商用&#xff0c;…

【前端素材】推荐优质后台管理系统cassie平台模板(附源码)

一、需求分析 1、系统定义 后台管理系统是一种用于管理网站、应用程序或系统的管理界面&#xff0c;通常由管理员和工作人员使用。它提供了访问和控制网站或应用程序后台功能的工具和界面&#xff0c;使其能够管理用户、内容、数据和其他各种功能。 2、功能需求 后台管理系…

Redis 缓存数据库

redis 中文网 http://www.redis.cn/ redis.net.cn 两种数据库阵营 1.关系型数据库 MySQL Oracle DB2 SQL Server 等基于二维表结构存储数据的文件型磁盘数据库 缺点: 因为数据库的特征是磁盘文件型数据库, 就造成每次查询都有IO操作, 海量数据查询速度较慢 2.NoSQL数据库 …

[技巧]Arcgis之图斑四至范围计算

ArcGIS图层&#xff08;点、线、面三类图形&#xff09;四至范围计算 说明&#xff1a;如下图画出来的框&#xff08;范围标记不是很准&#xff09; &#xff0c;图斑的x最大和x最小&#xff0c;y最大&#xff0c;y最小值则为四至范围值&#xff0c;通俗的讲就是图斑的最小外接…

微信开发者工具-代码管理和码云Github远程仓库集成

目录 思考&#xff1a;IDE如何进行代码管理 代码管理方式 一、自身提供服务 二、Git 扩展 1、环境准备 2、创建项目代码 3、进行项目Git初始化 4、在码云新建远程仓库 5、将项目进行远程仓库关联 三、SVN扩展 四、代码管理 思考&#xff1a;IDE如何进行代码管理 初识开…

力扣2月最后三天的每日一题

力扣2月最后三天的每日一题 前言2867.统计树中的合法路径数目思路确定1e5中的质数统计每个点的连接情况开始对质数点进行处理完整代码 2673.使二叉树所有路径值相等的最小代价思路完整代码 2581.统计可能的树根数目思路建立连通关系将猜测数组变为哈希表&#xff0c;方便查询利…

高级语言期末2007级B卷(计算机学院)

1.从键盘输入任意一个整数&#xff0c;求此整数各位数字中零的个数&#xff0c;以及各位数字中最大者。 #include <stdio.h>int getzero(char *str){char max0;int i0;int count0;while(str[i]!\0){if(str[i]>max)maxstr[i];if(str[i]0)count;i;}printf("%c\n&q…

vue如何重写移动端长按文字复制的功能

移动端长按文字会出现 “复制 全选”的默认弹框&#xff08;这里拿安卓举例吧&#xff09; 但是有的时候需要在长按的时候增加别的功能 这时候就需要禁用原生的弹框然后重写自己的功能 第一步&#xff1a;禁用掉原生弹窗 但是支持划选文字 重要css属性&#xff1a; -webkit…

HarmonyOS Full SDK的安装

OpenHarmony的应用开发工具HUAWEI DevEco Studio现在随着OpenHarmony版本发布而发布,只能在版本发布说明中下载,例如最新版本的OpenHarmony 4.0 Release。对应的需要下载DevEco Studio 4.0 Release,如下图。 图片 下载Full SDK主要有两种方式,一种是通过DevEco Studio下载…

【毛毛讲书】【时间贫困】时间都去哪了?

重磅推荐专栏&#xff1a; 《大模型AIGC》 《课程大纲》 《知识星球》 本专栏致力于探索和讨论当今最前沿的技术趋势和应用领域&#xff0c;包括但不限于ChatGPT和Stable Diffusion等。我们将深入研究大型模型的开发和应用&#xff0c;以及与之相关的人工智能生成内容&#xff…

查看网络连接的netstat

netstat是一个监控TCP/IP网络的非常有用的工具&#xff0c;可以显示路由表、实际的网络连接&#xff0c;以及每一个网络接口设备的状态信息&#xff0c;可以让用户得知目前都有哪些网络连接正在运作。netstat用户显示与IP、TCP、UDP和ICMP协议相关的统计数据&#xff0c;一般用…

文件怎么减小内存?4个简单的方法~

随着我们在电脑或移动设备上创建、下载和收集越来越多的文件&#xff0c;存储空间的管理变得尤为重要。有时&#xff0c;文件太大会占用过多的内存&#xff0c;导致存储空间不足的问题。但别担心&#xff0c;本文将向您介绍五种简单有效的方法&#xff0c;帮助您轻松减小文件的…

微信云开发-- Mac安装 wx-server-sdk依赖

第一次上传部署云函数时&#xff0c;会提示安装依赖wx-server-sdk 一. 判断是否安装wx-server-sdk依赖 先创建一个云函数&#xff0c;然后检查云函数目录。 如果云函数目录下只显示如下图所示三个文件&#xff0c;说明未安装依赖。 如果云函数目录下显示如下图所示四个文件&a…

YOLOv9详细解读,改进提升全面分析(附YOLOv9结构图)

&#x1f951; Welcome to Aedream同学 s blog! &#x1f951; 文章目录 1. 概要1.1 模型结构上的改动:1.2 训练脚本上的改动&#xff1a; 2. 介绍2.1 背景2.2 主要贡献 3. 总体框架3.1 可编程梯度信息&#xff08;PGI&#xff09;3.1.1 辅助可逆分支3.1.2 多级辅助信息 3.2 Ge…

浅析能耗监测系统在大型数据中心的应用

彭姝麟 Acrelpsl 1总体设计 大型数据中心能耗监测系统包含硬件和软件两大部分&#xff0c;其硬件组成主要包括监控服务器、主机设备、网络设备、环境参数传感器、通风模块等&#xff0c;总体采集逻辑采用三级监控体系。一级为主机设备&#xff0c;作为系统的应用层&#xff0c…