JAVA异步的TCP 通讯-服务端

一、服务端代码示例

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AdvancedAsyncTCPServer {
    private static final int PORT = 8888;
    private static final int BUFFER_SIZE = 1024;
    private final AsynchronousServerSocketChannel serverSocketChannel;
    private final ExecutorService threadPool;

    public AdvancedAsyncTCPServer() throws IOException {
        // 创建异步服务器套接字通道
        serverSocketChannel = AsynchronousServerSocketChannel.open();
        // 绑定到指定端口
        serverSocketChannel.bind(new InetSocketAddress(PORT));
        // 创建一个固定大小的线程池,用于处理业务逻辑
        threadPool = Executors.newFixedThreadPool(10);
        System.out.println("Server started on port " + PORT);
    }

    public void start() {
        // 开始接受客户端连接
        acceptConnections();
    }

    private void acceptConnections() {
        // 异步接受客户端连接
        serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
            @Override
            public void completed(AsynchronousSocketChannel clientChannel, Void attachment) {
                // 继续接受下一个连接
                acceptConnections();
                // 处理新连接
                handleConnection(clientChannel);
            }

            @Override
            public void failed(Throwable exc, Void attachment) {
                System.err.println("Failed to accept connection: " + exc.getMessage());
            }
        });
    }

    private void handleConnection(AsynchronousSocketChannel clientChannel) {
        // 创建缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
        // 异步读取客户端数据
        clientChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer bytesRead, ByteBuffer buffer) {
                if (bytesRead > 0) {
                    buffer.flip();
                    byte[] data = new byte[buffer.remaining()];
                    buffer.get(data);
                    String message = new String(data);
                    System.out.println("Received message from client: " + message);

                    // 使用线程池处理业务逻辑
                    threadPool.submit(() -> {
                        try {
                            // 模拟业务处理
                            String responseMessage = "Server processed: " + message;
                            ByteBuffer responseBuffer = ByteBuffer.wrap(responseMessage.getBytes());
                            // 异步发送响应给客户端
                            clientChannel.write(responseBuffer, responseBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                                @Override
                                public void completed(Integer bytesWritten, ByteBuffer buffer) {
                                    System.out.println("Response sent to client");
                                    try {
                                        // 继续读取客户端数据
                                        buffer.clear();
                                        clientChannel.read(buffer, buffer, this);
                                    } catch (Exception e) {
                                        closeChannel(clientChannel);
                                    }
                                }

                                @Override
                                public void failed(Throwable exc, ByteBuffer buffer) {
                                    System.err.println("Failed to send response: " + exc.getMessage());
                                    closeChannel(clientChannel);
                                }
                            });
                        } catch (Exception e) {
                            closeChannel(clientChannel);
                        }
                    });
                } else if (bytesRead == -1) {
                    // 客户端关闭连接
                    closeChannel(clientChannel);
                } else {
                    // 继续读取客户端数据
                    buffer.clear();
                    clientChannel.read(buffer, buffer, this);
                }
            }

            @Override
            public void failed(Throwable exc, ByteBuffer buffer) {
                System.err.println("Failed to read data: " + exc.getMessage());
                closeChannel(clientChannel);
            }
        });
    }

    private void closeChannel(AsynchronousSocketChannel channel) {
        try {
            System.out.println("Closing client connection");
            channel.close();
        } catch (IOException e) {
            System.err.println("Error closing channel: " + e.getMessage());
        }
    }

    public void stop() {
        try {
            // 关闭服务器套接字通道
            serverSocketChannel.close();
            // 关闭线程池
            threadPool.shutdown();
        } catch (IOException e) {
            System.err.println("Error stopping server: " + e.getMessage());
        }
    }

    public static void main(String[] args) {
        try {
            AdvancedAsyncTCPServer server = new AdvancedAsyncTCPServer();
            server.start();
        } catch (IOException e) {
            System.err.println("Error starting server: " + e.getMessage());
        }
    }
}

二、代码分析

AdvancedAsyncTCPServer 类

  1. 构造函数:创建 AsynchronousServerSocketChannel 并绑定到指定端口,同时创建一个固定大小的线程池用于处理业务逻辑。

  2. start() 方法:开始接受客户端连接。
  3. acceptConnections() 方法:异步接受客户端连接,使用 CompletionHandler 处理连接结果。
  4. handleConnection() 方法:处理新连接,异步读取客户端数据,并使用线程池处理业务逻辑。
  5. closeChannel() 方法:关闭客户端通道。
  6. stop() 方法:关闭服务器套接字通道和线程池。

CompletionHandler

  1. 用于处理异步操作的完成结果,包括连接、读取和写入操作。
  2. 在 completed() 方法中处理成功的操作,在 failed() 方法中处理失败的操作。

线程池

  1. 使用 Executors.newFixedThreadPool(10) 创建一个固定大小的线程池,用于处理业务逻辑,避免阻塞 I/O 操作。

三、优点

  • 异步 I/O:使用 Java NIO 2 的异步 I/O 功能,提高了服务器的并发处理能力。
  • 线程池:使用线程池处理业务逻辑,避免了创建过多线程导致的性能问题。
  • 异常处理:对各种异常情况进行了处理,提高了代码的健壮性。
  • 资源管理:在关闭服务器时,正确关闭服务器套接字通道和线程池,避免资源泄漏。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/965162.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Vue Dom截图插件,截图转Base64 html2canvas

安装插件 npm install html2canvas --save插件使用 <template><div style"padding: 10px;"><div ref"imageTofile" class"box">发生什么事了</div><button click"toImage" style"margin: 10px;&quo…

Flink2支持提交StreamGraph到Flink集群

最近研究Flink源码的时候&#xff0c;发现Flink已经支持提交StreamGraph到集群了&#xff0c;替换掉了原来的提交JobGraph。 新增ExecutionPlan接口&#xff0c;将JobGraph和StreamGraph作为实现。 Flink集群Dispatcher也进行了修改&#xff0c;从JobGraph改成了接口Executio…

Unity扩展编辑器使用整理(一)

准备工作 在Unity工程中新建Editor文件夹存放编辑器脚本&#xff0c; Unity中其他的特殊文件夹可以参考官方文档链接&#xff0c;如下&#xff1a; Unity - 手册&#xff1a;保留文件夹名称参考 (unity3d.com) 一、菜单栏扩展 1.增加顶部菜单栏选项 使用MenuItem&#xff…

2025年02月05日Github流行趋势

项目名称&#xff1a;OCRmyPDF 项目地址url&#xff1a;https://github.com/ocrmypdf/OCRmyPDF项目语言&#xff1a;Python历史star数&#xff1a;15872今日star数&#xff1a;157项目维护者&#xff1a;jbarlow83, fritz-hh, apps/dependabot, mawi12345, mara004项目简介&…

ASP.NET Core中间件的概念及基本使用

什么是中间件 中间件是ASP.NET Core的核心组件&#xff0c;MVC框架、响应缓存、身份验证、CORS、Swagger等都是内置中间件。 广义上来讲&#xff1a;Tomcat、WebLogic、Redis、IIS&#xff1b;狭义上来讲&#xff0c;ASP.NET Core中的中间件指ASP.NET Core中的一个组件。中间件…

【狂热算法篇】并查集:探秘图论中的 “连通神器”,解锁动态连通性的神秘力量

嘿&#xff0c;朋友们&#xff01;喜欢这个并查集的讲解吗 记得点个关注哦&#xff0c;让我们一起探索算法的奥秘&#xff0c;别忘了一键三连&#xff0c;你的支持是我最大的动力&#xff01; 欢迎拜访&#xff1a;羑悻的小杀马特.-CSDN博客 本篇主题&#xff1a;深度剖析并查…

Jupyter Lab的使用

Lab与Notebook的区别: Jupyter Lab和Jupyter notebook有什么区别&#xff0c;这里找到一篇博客不过我没细看&#xff0c; Jupyter Lab和Jupyter Notebook的区别 - codersgl - 博客园 使用起来Lab就是一个更齐全、功能更高级的notebook&#xff0c; 启用滚动输出: 有时候一个…

C++【深入 STL--list 之 迭代器与反向迭代器】

接前面的手撕list(上)文章&#xff0c;由于本人对于list的了解再一次加深。本文再次对list进行深入的分析与实现。旨在再一次梳理思路&#xff0c;修炼代码内功。 1、list 基础架构 list底层为双向带头循环链表&#xff0c;问题是如何来搭建这个list类。可以进行下面的考虑&am…

Games104——游戏引擎Gameplay玩法系统:基础AI

这里写目录标题 寻路/导航系统NavigationWalkable AreaWaypoint NetworkGridNavigation Mesh&#xff08;寻路网格&#xff09;Sparse Voxel Octree Path FindingDijkstra Algorithm迪杰斯特拉算法A Star&#xff08;A*算法&#xff09; Path Smoothing Steering系统Crowd Simu…

2024最新版Node.js详细安装教程(含npm配置淘宝最新镜像地址)

一&#xff1a;Node.js安装 浏览器中搜索Nodejs&#xff0c;或直接用网址:Node.js — 在任何地方运行 JavaScript 建议此处下载长期支持版本&#xff08;红框内&#xff09;: 开始下载&#xff0c;完成后打开文件: 进入安装界面&#xff0c;在此处勾选&#xff0c;再点击n…

高效 MyBatis SQL 写法一

高效 MyBatis SQL 写法一 前言 MyBatis 作为一款优秀的持久层框架&#xff0c;极大地简化了数据库操作。 然而&#xff0c;在实际开发中&#xff0c;XML 配置的编写仍然可能显得繁琐。 本文将分享一些 MyBatis 动态 SQL 的优质写法&#xff0c;帮助开发者提升效率并减少错误…

C语言按位取反【~】详解,含原码反码补码的0基础讲解【原码反码补码严格意义上来说属于计算机组成原理的范畴,不过这也是学好编程初级阶段的必修课】

目录 概述【适合0基础看的简要描述】&#xff1a; 上述加粗下划线的内容提取版&#xff1a; 从上述概述中提取的核心知识点&#xff0c;需背诵&#xff1a; 整数【包含整数&#xff0c;负整数和0】的原码反码补码相互转换的过程图示&#xff1a; 过程详细刨析&#xff1a;…

如何安装PHP依赖库 更新2025.2.3

要在PHP项目中安装依赖&#xff0c;首先需要确保你的系统已经安装了Composer。Composer是PHP的依赖管理工具&#xff0c;它允许你声明项目所需的库&#xff0c;并管理它们。以下是如何安装Composer和在PHP项目中安装依赖的步骤&#xff1a; 一. 安装Composer 对于Windows用户…

【通俗易懂说模型】线性回归(附深度学习、机器学习发展史)

&#x1f308; 个人主页&#xff1a;十二月的猫-CSDN博客 &#x1f525; 系列专栏&#xff1a; &#x1f3c0;深度学习_十二月的猫的博客-CSDN博客 &#x1f4aa;&#x1f3fb; 十二月的寒冬阻挡不了春天的脚步&#xff0c;十二点的黑夜遮蔽不住黎明的曙光 目录 1. 前言 2. …

硬件电路基础

目录 1. 电学基础 1.1 原子 1.2 电压 1.3 电流 1.电流方向&#xff1a; 正极->负极,正电荷定向移动方向为电流方向&#xff0c;与电子定向移动方向相反。 2.电荷&#xff08;这里表示负电荷&#xff09;运动方向&#xff1a; 与电流方向相反 1.4 测电压的时候 2. 地线…

【含文档+PPT+源码】基于Python爬虫二手房价格预测与可视化系统的设计与实现

项目介绍 本课程演示的是一款基于Python爬虫二手房价格预测与可视化系统&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的 Java 学习者。 包含&#xff1a;项目源码、项目文档、数据库脚本、软件工具等所有资料 带你从零开始部署运行本套系统 该项…

【数据结构】树哈希

目录 一、树的同构1. 定义2. 具体理解(1) 结点对应(2) 孩子相同(3) 递归性质 3. 示例 二、树哈希1.定义2.哈希过程&#xff08;1&#xff09;叶节点哈希&#xff08;2&#xff09;非叶节点哈希&#xff08;3&#xff09;组合哈希值 3.性质&#xff08;1&#xff09; 唯一性 \re…

渗透测试之文件包含漏洞 超详细的文件包含漏洞文章

目录 说明 通常分为两种类型&#xff1a; 本地文件包含 典型的攻击方式1&#xff1a; 影响&#xff1a; 典型的攻击方式2&#xff1a; 包含路径解释&#xff1a; 日志包含漏洞&#xff1a; 操作原理 包含漏洞读取文件 文件包含漏洞远程代码执行漏洞: 远程文件包含…

Mysql:数据库

Mysql 一、数据库概念&#xff1f;二、MySQL架构三、SQL语句分类四、数据库操作4.1 数据库创建4.2 数据库字符集和校验规则4.3 数据库修改4.4 数据库删除4.4 数据库备份和恢复其他 五、表操作5.1 创建表5.2 修改表5.3 删除表 六、表的增删改查6.1 Create(创建):数据新增1&#…

ChatGPT怎么回事?

纯属发现&#xff0c;调侃一下~ 这段时间deepseek不是特别火吗&#xff0c;尤其是它的推理功能&#xff0c;突发奇想&#xff0c;想用deepseek回答一些问题&#xff0c;回答一个问题之后就回复服务器繁忙&#xff08;估计还在被攻击吧~_~&#xff09; 然后就转向了GPT&#xf…