Netty 入门学习

前言

学习Spark源码绕不开通信,Spark通信是基于Netty实现的,所以先简单学习总结一下Netty。

Spark 通信历史

最开始: Akka
Spark 1.3: 开始引入Netty,为了解决大块数据(如Shuffle)的传输问题
Spark 1.6:支持配置使用 Akka 或者 Netty
Spark 2:完全废弃Akka,全部使用Netty

Akka 是一个用 Scala 编写的库,用于简化编写容错的、高可伸缩性的 Java 和 Scala 的 Actor 模型应用。
Spark 借鉴Akka 通过 Netty 实现了类似的简约版的Actor 模型

Netty

Server 主要代码:

// 创建ServerBootstrap实例,服务器启动对象
ServerBootstrap bootstrap = new ServerBootstrap();

ChannelFuture channelFuture = bootstrap.bind(8888).sync();
// 等待服务器关闭
channelFuture.channel().closeFuture().sync();

主要是启动 ServerBootstrap、绑定端口、等待关闭。

Client 主要代码:

// 创建Bootstrap实例,客户端启动对象
Bootstrap bootstrap = new Bootstrap();
// 连接服务端
ChannelFuture channelFuture = bootstrap.connect("localhost", 8888).sync();

Server 添加 Handler

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline().addLast(new ServerHandler());
    }
});
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        socketChannel.pipeline().addLast(new ClientHandler());
    }
});

这里的 ServerHandler 和 ClientHandler 都是自己实现的类,处理具体的逻辑。
如channelActive 建立连接时发消息给服务器,channelRead 读取数据时调用,处理读取数据的逻辑。给服务器或者客户端发消息可以用 writeAndFlush 方法。

完整代码

地址:https://gitee.com/dongkelun/java-learning/tree/master/netty-learning/src/main/java/com/dkl/java/demo

NettyServer

package com.dkl.java.demo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {

    public static void main(String[] args) {
        try {
            bind();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public static void bind() throws InterruptedException {

        // 创建boss线程组,用于接收连接
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 创建worker线程组,用于处理连接上的I/O操作,含有子线程NioEventGroup个数为CPU核数大小的2倍
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 创建ServerBootstrap实例,服务器启动对象
            ServerBootstrap bootstrap = new ServerBootstrap();

            // 使用链式编程配置参数
            // 将boss线程组和worker线程组暂存到ServerBootstrap
            bootstrap.group(bossGroup, workerGroup);
            // 设置服务端Channel类型为NioServerSocketChannel作为通道实现
            bootstrap.channel(NioServerSocketChannel.class);

            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    // 添加ServerHandler到ChannelPipeline,对workerGroup的SocketChannel(客户端)设置处理器
                    socketChannel.pipeline().addLast(new ServerHandler());
                }
            });
            // 设置启动参数,初始化服务器连接队列大小。服务端处理客户端连接请求是顺序处理,一个时间内只能处理一个客户端请求
            // 当有多个客户端同时来请求时,未处理的请求先放入队列中
            bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
            // 绑定端口并启动服务器,bind方法是异步的,sync方法是等待异步操作执行完成,返回ChannelFuture异步对象
            ChannelFuture channelFuture = bootstrap.bind(8888).sync();
            // 等待服务器关闭
            channelFuture.channel().closeFuture().sync();
        } finally {
            // 优雅地关闭boss线程组
            bossGroup.shutdownGracefully();
            // 优雅地关闭worker线程组
            workerGroup.shutdownGracefully();
        }
    }
}

ServerHandler

package com.dkl.java.demo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;

public class ServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 当 Channel 已经注册到它的 EventLoop 并且能够处理 I/O 时被调用
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("执行 channelRegistered");
    }

    /**
     * 当 Channel 从它的 EventLoop 注销并且无法处理任何 I/O 时被调
     * 用
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("执行 channelUnregistered");
    }

    /**
     * 当 Channel 处于活动状态时被调用;Channel 已经连接/绑定并且已经就绪
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("执行 channelActive");
    }

    /**
     * 当 Channel 离开活动状态并且不再连接它的远程节点时被调用
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("执行 channelInactive");
    }

    /**
     * 当从 Channel 读取数据时被调用
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        System.out.println("执行 channelRead");
        // 处理接收到的数据
        ByteBuf byteBuf = (ByteBuf) msg;
        try {
            // 将接收到的字节数据转换为字符串
            String message = byteBuf.toString(CharsetUtil.UTF_8);
            // 打印接收到的消息
            System.out.println("Server端收到客户消息: " + message);
            // 发送响应消息给客户端
            ctx.writeAndFlush(Unpooled.copiedBuffer("我是服务端,我收到你的消息啦~", CharsetUtil.UTF_8));
        } finally {
            // 释放ByteBuf资源
            ReferenceCountUtil.release(byteBuf);
        }

    }

    /**
     * 当 Channel 上的一个读操作完成时被调用,对通道的读取完成的事件或通知。当读取完成可通知发送方或其他的相关方,告诉他们接受方读取完成
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        System.out.println("执行 channelReadComplete");
    }

    /**
     * 当 ChannelnboundHandler.fireUserEventTriggered()方法被调用时被
     * 调用
     *
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("执行 userEventTriggered");
    }

    /**
     * 当 Channel 的可写状态发生改变时被调用。可以通过调用 Channel 的 isWritable()方法
     * * 来检测 Channel 的可写性。与可写性相关的阈值可以通过
     * * Channel.config().setWriteHighWaterMark()和 Channel.config().setWriteLowWaterMark()方法来
     * * 设置
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        System.out.println("执行 channelWritabilityChanged");
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("执行 exceptionCaught");
        // 异常处理
        cause.printStackTrace();
        ctx.close();
    }
}

NettyClient

package com.dkl.java.demo;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient {

    public static void main(String[] args) {
        start();
    }

    public static void start() {

        // 创建EventLoopGroup,用于处理客户端的I/O操作
        EventLoopGroup groupThread = new NioEventLoopGroup();

        try {
            // 创建Bootstrap实例,客户端启动对象
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(groupThread);
            // 设置服务端Channel类型为NioSocketChannel作为通道实现
            bootstrap.channel(NioSocketChannel.class);
            // 设置客户端处理
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new ClientHandler());
                }
            });
            // 绑定端口
            ChannelFuture channelFuture = bootstrap.connect("localhost", 8888).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            // 优雅地关闭线程
            groupThread.shutdownGracefully();
        }
    }
}

ClientHandler

package com.dkl.java.demo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;

public class ClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        // 连接建立时的处理,发送请求消息给服务器
        ctx.writeAndFlush(Unpooled.copiedBuffer("你好,服务端!我是客户端,测试通道连接", CharsetUtil.UTF_8));
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 处理接收到的数据
        ByteBuf byteBuf = (ByteBuf) msg;
        try {
            // 将接收到的字节数据转换为字符串
            String message = byteBuf.toString(CharsetUtil.UTF_8);
            // 打印接收到的消息
            System.out.println("受到服务端响应的消息: " + message);

            // TODO: 对数据进行业务处理

        } finally {
            // 释放ByteBuf资源
            ReferenceCountUtil.release(byteBuf);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 异常处理
        cause.printStackTrace();
        ctx.close();
    }
}

运行截图


handler 执行顺序

Server 端

连接时:

执行 channelRegistered
执行 channelActive
执行 channelRead
执行 channelReadComplete

断开连接时:

执行 channelReadComplete
(强制中断 Client 连接
执行 exceptionCaught
执行 userEventTriggered (exceptionCaught 中 ctx.close()) 触发
)
执行 channelInactive
执行 channelUnregistered

channelReadComplete 中 ctx.close(); 触发:
执行 channelInactive
执行 channelUnregistered

Client 端

执行 channelRegistered
执行 channelActive
执行 channelRead
执行 channelReadComplete

Spark 对应位置

  • Spark版本:3.2.3
  • Server: org.apache.spark.network.server.TransportServer.init
  • Client: org.apache.spark.network.client.TransportClientFactory.createClient


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/953760.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

鸿蒙报错Init keystore failed: keystore password was incorrect

报错如下&#xff1a; > hvigor ERROR: Failed :entry:defaultSignHap... > hvigor ERROR: Tools execution failed. 01-13 16:35:55 ERROR - hap-sign-tool: error: Init keystore failed: keystore password was incorrect * Try the following: > The key stor…

IDEA的Git界面(ALT+9)log选项不显示问题小记

IDEA的Git界面ALT9 log选项不显示问题 当前问题idea中log界面什么都不显示其他选项界面正常通过命令查询git日志正常 预期效果解决办法1. 检查 IDEA 的 Git 设置2. 刷新 Git Log (什么都没有大概率是刷新不了)3. 检查分支和日志是否存在4. 清理 IDEA 缓存 (我用这个成功解决)✅…

ffmpeg硬件编码

使用FFmpeg进行硬件编码可以显著提高视频编码的性能&#xff0c;尤其是在处理高分辨率视频时。硬件编码利用GPU或其他专用硬件&#xff08;如Intel QSV、NVIDIA NVENC、AMD AMF等&#xff09;来加速编码过程。以下是使用FFmpeg进行硬件编码的详细说明和示例代码。 1. 硬件编码支…

65.在 Vue 3 中使用 OpenLayers 绘制带有箭头的线条

前言 在现代的前端开发中&#xff0c;地图已经成为许多项目的核心功能之一。OpenLayers 是一个强大的开源地图库&#xff0c;它提供了丰富的功能和高度的定制化支持。在本篇文章中&#xff0c;我将向大家展示如何在 Vue 3 中使用 OpenLayers 绘制带有箭头的线条。 我们将实现…

C++内存泄露排查

内存泄漏是指程序动态分配的内存未能及时释放&#xff0c;导致系统内存逐渐耗尽&#xff0c;最终可能造成程序崩溃或性能下降。在C中&#xff0c;内存泄漏通常发生在使用new或malloc等分配内存的操作时&#xff0c;但没有正确地使用delete或free来释放这块内存。 在日常开发过程…

Ubuntu上,ffmpeg如何使用cuda硬件解码、编码、转码加速

本文使用 Ubuntu 环境。Ubuntu 直接使用 APT 安装的就支持 CUDA 加速。本文使用这样下载的版本进行演示&#xff0c;你自己编译或者其他源的版本可能会不同。 ffmpeg 的一些介绍&#xff0c;以及 macOS 版本的 ffmpeg 硬件加速请见《macOS上如何安装&#xff08;不需要编译安装…

linux: 文本编辑器vim

文本编辑器 vi的工作模式 (vim和vi一致) 进入vim的方法 方法一:输入 vim 文件名 此时左下角有 "文件名" 文件行数,字符数量 方法一: 输入 vim 新文件名 此时新建了一个文件并进入vim,左下角有 "文件名"[New File] 灰色的长方形就是光标,输入文字,左下…

调用企业微信新建日程 API 报 api forbidden 的解决方案

报错详细信息&#xff1a; {"errcode":48002,"errmsg":"api forbidden, hint: [1266719663513970651415782], from ip: xxx.xxx.xxx.xxx, more info at https://open.work.weixin.qq.com/devtool/query?e48002" } 解决方案&#xff1a; 1. 登…

rtthread学习笔记系列(4/5/6/7/15/16)

文章目录 4. 杂项4.1 检查是否否是2的幂 5. 预编译命令void类型和rt_noreturn类型的区别 6.map文件分析7.汇编.s文件7.1 汇编指令7.1.1 BX7.1.2 LR链接寄存器7.1.4 []的作用7.1.4 简单的指令 7.2 MSR7.3 PRIMASK寄存器7.4.中断启用禁用7.3 HardFault_Handler 15 ARM指针寄存器1…

微软与腾讯技术交锋,TRELLIS引领3D生成领域多格式支持新方向

去年 11 月&#xff0c;腾讯推出 Hunyuan3D 生成模型&#xff0c;是业界首个同时支持文字和图像生成 3D 的开源大模型。紧接着不到一个月&#xff0c;微软便发布了全新框架 TRELLIS&#xff0c;加入 3D 资产生成领域的竞争中。TRELLIS 支持多格式输出&#xff0c;包括辐射场、3…

【C++】类与对象(中上)(难点部分)

目录 &#x1f495;1.类的默认成员函数 &#x1f495;2.构造函数 &#x1f495;3.析构函数 &#x1f495;4.缺省值 &#x1f495;5.拷贝构造函数 &#xff08;最新更新时间——2025.1.14&#xff09; 这世间没有绝境 只有对处境绝望的人 &#x1f495;1.类的默认成员函数 默…

Apache Hop从入门到精通 第三课 Apache Hop下载安装

1、下载 官方下载地址&#xff1a;https://hop.apache.org/download/&#xff0c;本教程是基于apache-hop-client-2.11.0.zip进行解压&#xff0c;需要jdk17&#xff0c;小伙伴们可以根据自己的需求下载相应的版本。如下图所示 2、下载jdk17&#xff08;https://www.microsoft…

springboot房屋租赁管理系统

Spring Boot房屋租赁管理系统是一种基于Spring Boot框架构建的&#xff0c;旨在解决传统租房市场中房源信息更新不及时、虚假信息泛滥、交易流程繁琐等问题的信息化解决方案。 一、系统背景与目的 随着城市化进程的加快和人口流动性的增强&#xff0c;租房市场需求急剧增长。…

计算机网络 (35)TCP报文段的首部格式

前言 计算机网络中的TCP&#xff08;传输控制协议&#xff09;报文段的首部格式是TCP协议的核心组成部分&#xff0c;它包含了控制TCP连接的各种信息和参数。 一、TCP报文段的结构 TCP报文段由首部和数据两部分组成。其中&#xff0c;首部包含了控制TCP连接的各种字段&#xff…

鸿蒙-页面和自定义组件生命周期

页面生命周期&#xff0c;即被Entry装饰的组件生命周期&#xff0c;提供以下生命周期接口&#xff1a; onPageShow&#xff1a;页面每次显示时触发一次&#xff0c;包括路由过程、应用进入前台等场景。onPageHide&#xff1a;页面每次隐藏时触发一次&#xff0c;包括路由过程、…

道旅科技借助云消息队列 Kafka 版加速旅游大数据创新发展

作者&#xff1a;寒空、横槊、娜米、公仪 道旅科技&#xff1a;科技驱动&#xff0c;引领全球旅游分销服务 道旅科技 &#xff08;https://www.didatravel.com/home&#xff09; 成立于 2012 年&#xff0c;总部位于中国深圳&#xff0c;是一家以科技驱动的全球酒店资源批发商…

【HarmonyOS NEXT】鸿蒙跳转华为应用市场目标APP下载页

【HarmonyOS NEXT】鸿蒙跳转华为应用市场目标APP下载页 一、问题背景&#xff1a; 如今&#xff0c;大家都离不开各种手机应用。随着鸿蒙系统用户越来越多&#xff0c;大家都希望能在鸿蒙设备上快速找到想用的 APP。华为应用市场里有海量的 APP&#xff0c;但之前从鸿蒙设备进…

JavaScript动态渲染页面爬取之Splash

Splash是一个 JavaScript渲染服务,是一个含有 HTTP API的轻量级浏览器,它还对接了 Python 中的 Twisted 库和 OT库。利用它&#xff0c;同样可以爬取动态渲染的页面。 功能介绍 利用 Splash&#xff0c;可以实现如下功能&#xff1a; 异步处理多个网页的渲染过程:获取渲染后…

Thrustmaster Hotas Warthog飞行操作杆开发

目录 0 摘 要 &#xff1a;简单说一下这篇文章在搞啥 1 背 景 &#xff1a;什么需求以及对开发的背景调查 2 环境配置 &#xff1a;具体需要什么环境&#xff0c;对软件层面的需求 3 硬件测试 &#xff1a;测试遥感器…

算法-查找数组对角线上最大的质数

力扣题目&#xff1a;2614. 对角线上的质数 - 力扣&#xff08;LeetCode&#xff09; 给你一个下标从 0 开始的二维整数数组 nums 。 返回位于 nums 至少一条 对角线 上的最大 质数 。如果任一对角线上均不存在质数&#xff0c;返回 0 。 注意&#xff1a; 如果某个整数大于…