一、服务端代码示例
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class AdvancedAsyncTCPServer {
private static final int PORT = 8888;
private static final int BUFFER_SIZE = 1024;
private final AsynchronousServerSocketChannel serverSocketChannel;
private final ExecutorService threadPool;
public AdvancedAsyncTCPServer() throws IOException {
// 创建异步服务器套接字通道
serverSocketChannel = AsynchronousServerSocketChannel.open();
// 绑定到指定端口
serverSocketChannel.bind(new InetSocketAddress(PORT));
// 创建一个固定大小的线程池,用于处理业务逻辑
threadPool = Executors.newFixedThreadPool(10);
System.out.println("Server started on port " + PORT);
}
public void start() {
// 开始接受客户端连接
acceptConnections();
}
private void acceptConnections() {
// 异步接受客户端连接
serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(AsynchronousSocketChannel clientChannel, Void attachment) {
// 继续接受下一个连接
acceptConnections();
// 处理新连接
handleConnection(clientChannel);
}
@Override
public void failed(Throwable exc, Void attachment) {
System.err.println("Failed to accept connection: " + exc.getMessage());
}
});
}
private void handleConnection(AsynchronousSocketChannel clientChannel) {
// 创建缓冲区
ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
// 异步读取客户端数据
clientChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer bytesRead, ByteBuffer buffer) {
if (bytesRead > 0) {
buffer.flip();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
String message = new String(data);
System.out.println("Received message from client: " + message);
// 使用线程池处理业务逻辑
threadPool.submit(() -> {
try {
// 模拟业务处理
String responseMessage = "Server processed: " + message;
ByteBuffer responseBuffer = ByteBuffer.wrap(responseMessage.getBytes());
// 异步发送响应给客户端
clientChannel.write(responseBuffer, responseBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer bytesWritten, ByteBuffer buffer) {
System.out.println("Response sent to client");
try {
// 继续读取客户端数据
buffer.clear();
clientChannel.read(buffer, buffer, this);
} catch (Exception e) {
closeChannel(clientChannel);
}
}
@Override
public void failed(Throwable exc, ByteBuffer buffer) {
System.err.println("Failed to send response: " + exc.getMessage());
closeChannel(clientChannel);
}
});
} catch (Exception e) {
closeChannel(clientChannel);
}
});
} else if (bytesRead == -1) {
// 客户端关闭连接
closeChannel(clientChannel);
} else {
// 继续读取客户端数据
buffer.clear();
clientChannel.read(buffer, buffer, this);
}
}
@Override
public void failed(Throwable exc, ByteBuffer buffer) {
System.err.println("Failed to read data: " + exc.getMessage());
closeChannel(clientChannel);
}
});
}
private void closeChannel(AsynchronousSocketChannel channel) {
try {
System.out.println("Closing client connection");
channel.close();
} catch (IOException e) {
System.err.println("Error closing channel: " + e.getMessage());
}
}
public void stop() {
try {
// 关闭服务器套接字通道
serverSocketChannel.close();
// 关闭线程池
threadPool.shutdown();
} catch (IOException e) {
System.err.println("Error stopping server: " + e.getMessage());
}
}
public static void main(String[] args) {
try {
AdvancedAsyncTCPServer server = new AdvancedAsyncTCPServer();
server.start();
} catch (IOException e) {
System.err.println("Error starting server: " + e.getMessage());
}
}
}
二、代码分析
AdvancedAsyncTCPServer
类:
-
构造函数:创建
AsynchronousServerSocketChannel
并绑定到指定端口,同时创建一个固定大小的线程池用于处理业务逻辑。 start()
方法:开始接受客户端连接。acceptConnections()
方法:异步接受客户端连接,使用CompletionHandler
处理连接结果。handleConnection()
方法:处理新连接,异步读取客户端数据,并使用线程池处理业务逻辑。closeChannel()
方法:关闭客户端通道。stop()
方法:关闭服务器套接字通道和线程池。
CompletionHandler
:
- 用于处理异步操作的完成结果,包括连接、读取和写入操作。
- 在
completed()
方法中处理成功的操作,在failed()
方法中处理失败的操作。
线程池:
-
使用
Executors.newFixedThreadPool(10)
创建一个固定大小的线程池,用于处理业务逻辑,避免阻塞 I/O 操作。
三、优点
- 异步 I/O:使用 Java NIO 2 的异步 I/O 功能,提高了服务器的并发处理能力。
- 线程池:使用线程池处理业务逻辑,避免了创建过多线程导致的性能问题。
- 异常处理:对各种异常情况进行了处理,提高了代码的健壮性。
- 资源管理:在关闭服务器时,正确关闭服务器套接字通道和线程池,避免资源泄漏。