通过 NIO + 多线程 提升硬件设备与系统的数据传输性能

一、项目展示

下图(模拟的数据可视化大屏)中数据是动态显示的

在这里插入图片描述

二、项目简介

描述:使用Client模拟了硬件设备,比如可燃气体浓度检测器。Client通过Socket与Server建立连接,Server保存数据到txt文件,并使用WebSocket将数据推送到数据可视化大屏

工作:通过多线程+NIO优化了Server性能

原理图:

在这里插入图片描述

三、代码实现

Server

NioSocketServerService.java

package com.example.server;

import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

@Service
public class NioSocketServerService {

    private static final int PORT = 8081;
    private static final int TIMEOUT = 5000;
    private static final BlockingQueue<String> writeQueue = new LinkedBlockingQueue<>();

    @PostConstruct
    public void startServer() {
        for (int i = 0; i < 4; i++) {
            new Thread(new FileWriterTask(writeQueue)).start();
        }

        new Thread(() -> {
            try {
                Selector selector = Selector.open();
                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.bind(new InetSocketAddress(PORT));
                serverSocketChannel.configureBlocking(false);
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

                System.out.println("Server is listening on port " + PORT);

                while (true) {
                    if (selector.select(TIMEOUT) == 0) {
                        continue;
                    }

                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();

                        try {
                            if (key.isAcceptable()) {
                                handleAccept(key, selector);
                            } else if (key.isReadable()) {
                                handleRead(key);
                            }
                        } catch (IOException e) {
                            key.cancel();               // 取消键的注册,这意味着该通道不再被选择器监视
                            key.channel().close();      // 关闭通道,释放资源
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }

    private void handleAccept(SelectionKey key, Selector selector) throws IOException {
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
        SocketChannelHandler.addBuffer(socketChannel);
        System.out.println("New client connected: " + socketChannel.getRemoteAddress());
    }

    private void handleRead(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        SocketChannelHandler.readFromChannel(socketChannel, writeQueue);
    }
}

SocketChannelHandler.java

package com.example.server;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;

public class SocketChannelHandler {

    private static final String DIRECTORY = "data/";
    private static final int BUFFER_SIZE = 2048;
    private static final Map<SocketChannel, ByteBuffer> bufferMap = new ConcurrentHashMap<>();
    private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    public static void addBuffer(SocketChannel socketChannel) {
        bufferMap.put(socketChannel, ByteBuffer.allocateDirect(BUFFER_SIZE));
    }

    public static void readFromChannel(SocketChannel socketChannel, BlockingQueue<String> writeQueue) throws IOException {
        ByteBuffer buffer = bufferMap.get(socketChannel);
        buffer.clear();
        int bytesRead;

        try {
            bytesRead = socketChannel.read(buffer);
        } catch (IOException e) {
            System.err.println("Error reading from socket: " + e.getMessage());
            socketChannel.close();
            bufferMap.remove(socketChannel);
            return;
        }

        if (bytesRead == -1) {          // 读取到-1表示客户端已关闭连接,移除缓冲区
            socketChannel.close();
            bufferMap.remove(socketChannel);
        } else if (bytesRead > 0) {
            buffer.flip();
            byte[] data = new byte[buffer.remaining()];
            buffer.get(data);
            String message = new String(data);

            String[] dataParts = message.split(" : ", 2);
            if (dataParts.length == 2) {
                String deviceId = dataParts[0].trim();
                String deviceData = dataParts[1].trim();
                String currentTime = LocalDateTime.now().format(dateTimeFormatter);
                String dataToWrite = DIRECTORY + deviceId + ".txt : " + currentTime + " : " + deviceData;

                writeQueue.add(dataToWrite);

                WebSocketServer.sendMessage(deviceId + " : " + currentTime + " : " + deviceData);
            }
        }
    }
}

FileWriterTask.java

package com.example.server;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class FileWriterTask implements Runnable {

    private static final int BATCH_SIZE = 10;

    /**
     * BlockingQueue是JUC包中的一个接口,提供了线程安全的队列操作
     * 支持阻塞的put和take操作,当队列满时put会阻塞,直到队列有空位;当队列空时take会阻塞,直到队列有元素
     * 其主要实现包括:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue
     */
    private final BlockingQueue<String> writeQueue;

    public FileWriterTask(BlockingQueue<String> writeQueue) {
        this.writeQueue = writeQueue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                List<String> dataList = new ArrayList<>();

                // 读取BATCH_SIZE条数据,或等待超时后退出循环
                while (dataList.size() < BATCH_SIZE) {
                    String data = writeQueue.poll(100, TimeUnit.MILLISECONDS);
                    if (data != null) {
                        dataList.add(data);
                    } else {
                        break;
                    }
                }

                // 如果读取到数据,则将其写入文件
                if (!dataList.isEmpty()) {
                    for (String data : dataList) {
                        String[] dataParts = data.split(" : ");
                        if (dataParts.length == 3) {
                            String fileName = dataParts[0].trim();
                            try (FileChannel fileChannel = FileChannel.open(Paths.get(fileName), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND)) {
                                ByteBuffer buffer = ByteBuffer.wrap((data + System.lineSeparator()).getBytes());
                                fileChannel.write(buffer);
                            }
                        }
                    }
                }
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Client

MultiThreadedSocketClient.java

package com.example.client;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MultiThreadedSocketClient {
    public static void main(String[] args) {
        String hostname = "localhost";
        int port = 8081;
        int numberOfDevices = 1000;
        ExecutorService executor = Executors.newFixedThreadPool(numberOfDevices);

        for (int i = 1; i <= numberOfDevices; i++) {
            String deviceId = "Device" + i;
            executor.submit(new DeviceClient(hostname, port, deviceId));
        }

        executor.shutdown();
    }
}

DeviceClient.java

package com.example.client;

import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Random;
import java.util.concurrent.TimeUnit;

class DeviceClient implements Runnable {
    private String hostname;
    private int port;
    private String deviceId;
    private Random random = new Random();
    private static final int MAX_RETRIES = 15;
    private static final int RETRY_DELAY_MS = 1000;

    public DeviceClient(String hostname, int port, String deviceId) {
        this.hostname = hostname;
        this.port = port;
        this.deviceId = deviceId;
    }

    @Override
    public void run() {
        int attempt = 0;
        boolean connected = false;

        while (attempt < MAX_RETRIES && !connected) {
            try {
                Thread.sleep(random.nextInt(15000));
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            try (Socket socket = new Socket(hostname, port);
                 PrintWriter out = new PrintWriter(socket.getOutputStream(), true)) {

                connected = true;

                while (true) {
                    try {
                        String data = deviceId + " : " + random.nextInt(50000);
                        out.println(data);
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            } catch (UnknownHostException e) {
                System.err.println("Unknown host: " + hostname);
                break;
            } catch (IOException e) {
                attempt++;
                int randomDelay = random.nextInt(10000);
                System.err.println(deviceId + "\tAttempt " + attempt + " - Connection refused. Retrying in " + (RETRY_DELAY_MS + randomDelay) + "ms...");
                try {
                    Thread.sleep(RETRY_DELAY_MS + randomDelay);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }

        if (!connected) {
            System.err.println("Failed to connect after " + MAX_RETRIES + " attempts.");
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/643580.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

操作系统总结4----死锁的处理策略总结

目录 2.4.2 死锁的处理策略-----预防死锁 &#xff08;1&#xff09;知识总览 &#xff08;2&#xff09;破环互斥条件 &#xff08;3&#xff09;破环不剥夺条件 &#xff08;4&#xff09;破环求情和保持条件 &#xff08;5&#xff09;破环循环等待条件 总结 2.4.3 死…

广告圈策划大师课:活动策划到品牌企划的深度解析

对于刚接触营销策划的新人来说&#xff0c;在这个知识密集型行业里生存&#xff0c;要学习非常多各种意思相近的概念&#xff0c;常常让人感到头疼&#xff0c;难以区分。 这里对这些策划概念进行深入解析&#xff0c;帮助您轻松理清各自的含义和区别。 1. 活动策划&#xff…

操作抖音小店一直不出单怎么办?只需要做好这两点就可以了!

大家好&#xff0c;我是电商小V 最近很多新手小伙伴来咨询我说自己操作抖音小店&#xff0c;自己的店铺长时间不出单应该怎么办&#xff1f;今天咱们就来详细的说一下&#xff0c; 咱们要清楚的就是自己的店铺不出&#xff0c;只需要咱们做好这两点就可以了&#xff0c; 第一点…

华为机考入门python3--(29)牛客29-字符串加解密

分类&#xff1a;字符变换 知识点&#xff1a; 字符是字母 char.isalpha() 字符是小写字母 char.islower() 字符是数字 char.isdigit() b变C chr((ord(b) - ord(a) 1) % 26 ord(A)) 题目来自【牛客】 # 加密 def encrypt_string(s):result ""for ch…

C++的AVL树

目录 基本概念 插入的语言分析 LL右旋 RR左旋 额外结论及问题1 LR左右旋 RL右左旋 额外结论及问题2 插入结点 更新bf与判断旋转方式 旋转代码实现 准备工作一 LL右旋的实现 RR左旋的实现 准备工作二 LR左右旋的实现 RL右左旋的实现 完整代码 基本概念 1、…

抖音小店新规重磅来袭!事关店铺流量!商家的福音来了?

大家好&#xff0c;我是喷火龙。 就在前两天&#xff0c;抖店发布了新规&#xff0c;我给大家总结了一下&#xff0c;无非就是两点。 第一点&#xff1a;保证金下调&#xff0c;一证开多店。 第二点&#xff1a;新品上架破10单&#xff0c;有流量扶持。 咱来细细的解读&…

无人机助力光伏项目测绘建模

随着全球对可再生能源需求的不断增长&#xff0c;光伏项目作为其中的重要一环&#xff0c;其建设规模和速度都在不断提高。在这一背景下&#xff0c;如何高效、准确地完成光伏项目的测绘与建模工作&#xff0c;成为了行业发展的重要课题。近年来&#xff0c;无人机技术的快速发…

【产品经理】如何培养对市场的洞察力

引言&#xff1a;        在最近频繁的产品管理职位面试中&#xff0c;我深刻体会到了作为产品经理需要的不仅仅是对市场和技术的敏锐洞察&#xff0c;更多的是在复杂多变的环境中&#xff0c;如何运用沟通、领导力和决策能力来引导产品从概念走向市场。这一系列博客将分享…

技术驱动未来,全面揭秘 Sui 的生态发展和布局

在不到一年的时间里&#xff0c;由 Mysten Labs 团队创立的 Layer1 区块链 Sui 迅速崛起&#xff0c;成功跃升至去中心化金融&#xff08;DeFi&#xff09;的前十名。根据 DeFi Llama 的数据&#xff0c;Sui的总锁定价值&#xff08;TVL&#xff09;在短短四个月内增长超过 100…

堆的实现

前言&#xff1a;本文讲述堆实现的几个难点&#xff0c;注意本文主要是以实现为主&#xff0c;建议有些基本概念认识的人阅读。 目录 1.堆 2.堆的实现 堆结构的定义&#xff1a; 要实现的接口&#xff1a; 接口的实现&#xff1a; 堆的初始化和销毁&#xff1a; 向堆中插…

【简单介绍下链表基础知识】

&#x1f3a5;博主&#xff1a;程序员不想YY啊 &#x1f4ab;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f917;点赞&#x1f388;收藏⭐再看&#x1f4ab;养成习惯 ✨希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出…

第八届能源、环境与材料科学国际学术会议(EEMS 2024)

文章目录 一、重要信息二、大会简介三、委员会四、征稿主题五、论文出版六、会议议程七、出版信息八、征稿编辑 一、重要信息 会议官网&#xff1a;http://ic-eems.com主办方&#xff1a;常州大学大会时间&#xff1a;2024年06月7-9日大会地点&#xff1a;新加坡 Holiday Inn …

OA界面这么香吗?总有老铁私信,让我多发点,他好参考。

OA的确是B端系统应用最为广泛的一种&#xff0c;这次再给大家分享十来个页面&#xff0c;希望对他们的界面提升有所帮助。 举报 评论 3

计算机考研|408开始的晚,怎么入手复习?六个月保姆级规划

万事开头难&#xff0c;特别是408 大家在第一遍复习408的时候&#xff0c;基本上都有这个问题&#xff0c;就是复习速度慢&#xff0c;理解成本高&#xff0c;因为数据结构&#xff0c;计算机组成原理这些都是大一大二开始学的内容&#xff0c;等到自己准备考研的时候&#xf…

洗地机哪个牌子最好用?2024洗地机排行榜

随着人们生活水平的提升&#xff0c;智能清洁家电已经成为日常生活中的必需品。如今的清洁家电市场上&#xff0c;洗地机、吸尘器和扫地机器人等设备各有其独特的功能和优势。洗地机结合了扫、拖、吸和自清洁等多种功能&#xff0c;不仅可以处理干湿垃圾&#xff0c;还能高效清…

打破传统相亲模式,这几款靠谱的相亲软件助你脱单

相亲软件在当今社会已经变得越来越普遍&#xff0c;市面上有众多相亲软件可供选择&#xff0c;但哪些相亲软件好用呢&#xff1f;下面介绍几款备受好评的相亲软件&#xff0c;帮助你在茫茫人海中找到那个对的人&#xff01; 1、一伴婚恋 这个APP它最大的优点就是信息真实靠谱…

mac上简单实现一个java调用C接口的JNI

目录 安装JDK及配置环境变量写Java代码生成头文件实现本地方法编译本地代码运行 Java 程序总结步骤 安装JDK及配置环境变量 参考&#xff1a;MAC系统安装JDK1.8及环境变量配置 写Java代码 // 文件名&#xff1a;Calculator.java public class Calculator {// 声明本地方法pu…

Vue 3指令与事件处理

title: Vue 3指令与事件处理 date: 2024/5/25 18:53:37 updated: 2024/5/25 18:53:37 categories: 前端开发 tags: Vue3基础指令详解事件处理高级事件实战案例最佳实践性能优化 第1章 Vue 3基础 1.1 Vue 3简介 Vue 3 是一个由尤雨溪&#xff08;尤大&#xff09;领导的开源…

5.22-wjn

使用select实现的TCP并发服务器端 #define SER_PORT 8888 #define SER_IP "192.168.125.158" int main(int argc, const char *argv[]) {//1、为通信创建一个端点int sfd socket(AF_INET, SOCK_STREAM, 0);//参数1&#xff1a;说明使用的是ipv4通信域//参数2&#…

SpringBoot Bean

配置优先级 Bean的管理 从IOC容器中获取Bean对象&#xff1a;注入IOC容器对象 bean的作用域 Bean对象默认在容器启动时实例化 Lazy在第一次使用时初始化 Bean的管理&#xff1a;第三方Bean 引入依赖&#xff0c;每次解析创建新对象&#xff0c;浪费资源 将第三方对象交给…