基于JavaSocket重写Dubbo网络传输层

前言

我们知道,位于 Serialize 层上面的是负责网络传输的 Transport 层,它负责调用编解码器 Codec2 把要传输的对象编码后传输、再对接收到的字节序列解码。

站在客户端的角度,一次 RPC 调用的流程大概是这样的:

  • Invoker 发起 RPC 调用请求
  • Exchange 层负责数据交换,实现 Request-Response 语义
  • Transport 层调用编码器对 Request 编码后发送,主线程阻塞等待
  • IO 线程读取到服务端响应的数据,解码器解码后得到结果,唤醒主线程

image.png
清楚这个流程之后,我们尝试把 Dubbo 默认用 Netty 实现的传输层替换成我们自己实现的。
特别声明:Netty 已经做的足够好了,我们这么做并没有什么意义,只是为了加深你对 Dubbo 传输层工作流程的理解。

自定义Transport

新建一个模块dubbo-extension-transport-javasocket用来封装我们自己的传输层实现。
因为要写的是 Dubbo 传输层的一个实现策略,所以要依赖dubbo-remoting-api

<dependencies>
    <dependency>
        <groupId>org.apache.dubbo</groupId>
        <artifactId>dubbo-remoting-api</artifactId>
        <version>${dubbo.version}</version>
    </dependency>
</dependencies>

Transporter

传输层的核心 SPI 接口是 org.apache.dubbo.remoting.Transporter,我们自己实现一个。
JavaSocketTransporter 的核心是:

  • Dubbo 开启服务时创建 JavaSocketServer
  • 客户端和服务端建立连接时创建 JavaSocketClient
public class JavaSocketTransporter implements Transporter {

    @Override
    public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
        return new JavaSocketServer(url, handler);
    }

    @Override
    public Client connect(URL url, ChannelHandler handler) throws RemotingException {
        return new JavaSocketClient(url, handler);
    }
}

Channel

Dubbo 抽象了 org.apache.dubbo.remoting.Channel 接口来表示一个 tcp 连接,我们用的 Java Socket 实现,对应的类是 java.nio.channels.SocketChannel。但是我们要写一个类来把 SocketChannel 适配成 Dubbo 的 Channel。
Java SocketChannel 并不支持维护属性,Dubbo Channel 是支持的,所以我们专门搞个 Map 记录一下。

public class JavaSocketChannel extends AbstractChannel {

    private static final ConcurrentMap<SocketChannel, JavaSocketChannel> CHANNEL_MAP = new ConcurrentHashMap<>();
    private final SocketChannel socketChannel;
    private final Codec2 codec;
    private final Map<String, Object> attributes = new ConcurrentHashMap<>();

    private JavaSocketChannel(Codec2 codec, SocketChannel sc, URL url, ChannelHandler handler) {
        super(url, handler);
        this.codec = codec;
        this.socketChannel = sc;
    }

    static JavaSocketChannel getOrAddChannel(Codec2 codec, SocketChannel sc, URL url, ChannelHandler handler) {
        JavaSocketChannel javaSocketChannel = CHANNEL_MAP.get(sc);
        if (javaSocketChannel == null) {
            javaSocketChannel = new JavaSocketChannel(codec, sc, url, handler);
        }
        return javaSocketChannel;
    }

    @Override
    public InetSocketAddress getRemoteAddress() {
        try {
            return (InetSocketAddress) socketChannel.getRemoteAddress();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public boolean isConnected() {
        return socketChannel.isConnected();
    }

    @Override
    public boolean hasAttribute(String key) {
        return attributes.containsKey(key);
    }

    @Override
    public Object getAttribute(String key) {
        return attributes.get(key);
    }

    @Override
    public void setAttribute(String key, Object value) {
        attributes.put(key, value);
    }

    @Override
    public void removeAttribute(String key) {
        attributes.remove(key);
    }

    @Override
    public InetSocketAddress getLocalAddress() {
        try {
            return (InetSocketAddress) socketChannel.getLocalAddress();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        super.send(message, sent);
        try {
            ChannelBuffer channelBuffer = ChannelBuffers.directBuffer(1024);
            codec.encode(this, channelBuffer, message);
            socketChannel.write(channelBuffer.toByteBuffer());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

RemotingServer

再看服务端,Dubbo 提供了抽象类 org.apache.dubbo.remoting.transport.AbstractServer 实现了一些通用的逻辑,我们的 JavaSocketServer 直接继承它即可。
Dubbo 启动暴露服务时,会一并开启我们给定的 JavaSocketServer,方法是doOpen
JavaSocketServer 核心是:

  • 通过 ServerSocketChannel 绑定本地端口来开启一个服务
  • 处理客户端连接
  • 网络IO数据读取、解码
public class JavaSocketServer extends AbstractServer {

    private ServerSocketChannel serverSocketChannel;
    private Map<SocketChannel, JavaSocketChannel> channelMap;

    public JavaSocketServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
    }

    @Override
    protected void doOpen() throws Throwable {
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.bind(getBindAddress());
        serverSocketChannel.configureBlocking(false);
        channelMap = new ConcurrentHashMap<>();
        // 开启一个线程来读网络数据
        new Thread(() -> {
            try {
                while (true) {
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    if (socketChannel != null) {
                        // 建立新连接,丢到Map
                        socketChannel.configureBlocking(false);
                        channelMap.put(socketChannel, JavaSocketChannel.getOrAddChannel(getCodec(), socketChannel, getUrl(), getChannelHandler()));
                    }
                    // 遍历所有连接,看看是否有数据可读
                    for (Map.Entry<SocketChannel, JavaSocketChannel> entry : channelMap.entrySet()) {
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 10);
                        int length = entry.getKey().read(byteBuffer);
                        if (length > 0) {
                            byteBuffer.flip();
                            // 读到新数据,尝试解码,交给后续handler处理
                            Object decode = getCodec().decode(entry.getValue(), ChannelBuffers.wrappedBuffer(byteBuffer));
                            if (decode != null) {
                                getDelegateHandler().received(entry.getValue(), decode);
                            }
                        }
                    }
                    Thread.sleep(10);// sleep一会,避免空转
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }

    @Override
    protected void doClose() throws Throwable {
        if (serverSocketChannel != null) {
            serverSocketChannel.close();
        }
    }

    @Override
    public boolean isBound() {
        return serverSocketChannel.isOpen();
    }

    @Override
    public Collection<Channel> getChannels() {
        return null;
    }

    @Override
    public Channel getChannel(InetSocketAddress remoteAddress) {
        return null;
    }
}

Client

最后是客户端,Dubbo 提供了抽象类 org.apache.dubbo.remoting.transport.AbstractClient 实现了一些通用逻辑,我们的 JavaSocketClient 也直接继承它即可。
JavaSocketClient 核心是:

  • 和服务端建立连接
  • Request 对象编码后发送
  • 读取服务端响应的数据、解码
public class JavaSocketClient extends AbstractClient {

    private SocketChannel socketChannel;

    public JavaSocketClient(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
    }

    @Override
    protected void doOpen() throws Throwable {
        socketChannel = SocketChannel.open();
        System.err.println("client open");
    }

    @Override
    protected void doClose() throws Throwable {

    }

    @Override
    protected void doConnect() throws Throwable {
        System.err.println("client connet");
        if (socketChannel.connect(getConnectAddress())) {
            new Thread(() -> {
                try {
                    while (true) {
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 10);
                        int length = socketChannel.read(byteBuffer);
                        if (length > 0) {
                            byteBuffer.flip();
                            Object decode = getCodec().decode(getChannel(), ChannelBuffers.wrappedBuffer(byteBuffer));
                            getDelegateHandler().received(getChannel(), decode);
                        }
                        Thread.sleep(1000);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

    @Override
    protected void doDisConnect() throws Throwable {

    }

    @Override
    protected Channel getChannel() {
        return JavaSocketChannel.getOrAddChannel(getCodec(), socketChannel, getUrl(), getChannelHandler());
    }

    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        try {
            ChannelBuffer channelBuffer = ChannelBuffers.directBuffer(1024);
            getCodec().encode(getChannel(), channelBuffer, message);
            socketChannel.write(channelBuffer.toByteBuffer());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

至此,自定义的传输层逻辑就写完了。接下来是让 Dubbo 加载并使用我们自定义的实现,可以通过 SPI 机制。
创建META-INF/dubbo/org.apache.dubbo.remoting.Transporter文件,编写内容:

javasocket=dubbo.extension.remoting.transport.javasocket.JavaSocketTransporter

服务端可以在 ProtocolConfig 里指定:

ProtocolConfig protocolConfig = new ProtocolConfig("dubbo", 20880);
protocolConfig.setServer("javasocket");

客户端可以在 ReferenceConfig 里配置参数指定:

Map<String, String> parameters = new HashMap<>();
parameters.put("client", "javasocket");

ReferenceConfig.setParameters(parameters);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/330151.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JMeter请求参数Parameters,带中文或特殊字符(+/=)时,例如登录密码或者token等,需要勾选编码

以前的登录接口密码参数不包含特殊字符&#xff0c;为了安全&#xff0c;产品今天修改了需求&#xff0c;密码必须由数字&#xff0c;字母和特殊字符构成&#xff0c;之前利用JMeter接口编写的脚本报错了&#xff0c;调整了一下&#xff0c;里面踩了一点坑&#xff0c;记录下来…

AM5-DB低压备自投装置在河北冠益荣信科技公司洞庭变电站工程中的应用

摘 要&#xff1a;随着电力需求的不断增加&#xff0c;电力系统供电可靠性要求越来越高&#xff0c;许多供电系统已具备两回或多回供电线路。备用电源自动投入装置可以有效提高供电的可靠性&#xff0c;该类装置能够在工作电源因故障断开后&#xff0c;自动且迅速地将备用电源投…

SpringMVC JSON数据处理见解6

6.JSON数据处理 6.1.添加json依赖 springmvc 默认使用jackson作为json类库,不需要修改applicationContext-servlet.xml任何配置&#xff0c;只需引入以下类库springmvc就可以处理json数据&#xff1a; <!--spring-json依赖--> <dependency><groupId>com.f…

react umi/max 封装页签组件

1. models/tabs // 全局共享数据示例 import { useState } from react;const useUser () > {const [items, setItems] useState<any[]>([]); // 页签的全局Item数据const [key, setKey] useState<string>(/home); // 页签的高亮Keyreturn {items,setItems…

Alinx ZYNQ 7020 LED调试--in RAM

设置拨码开关为JTAG方式 烧写LED bit stream a. 点击“Program device”烧录程序到FPGA中&#xff08;重新上电程序就丢失了&#xff09; b. /01_led/led.runs/impl_1/led.bit 程序烧录到Flash中 ZYNQ与以往的直接烧录Flash不同&#xff0c;首先必须PS&#xff0c;然后烧…

C语言总结十二:文件操作详细总结

在操作系统中&#xff0c;为了统一对各种硬件的操作&#xff0c;简化接口&#xff0c;不同的硬件设备也都被看成一个文件。对这些文件的操作&#xff0c;等同于对磁盘上普通文件的操作。我们不去探讨硬件设备是如何被映射成文件的&#xff0c;把任意 I/O 设备&#xff0c;转换成…

边缘计算AI智能分析网关V4客流统计算法的概述

客流量统计AI算法是一种基于人工智能技术的数据分析方法&#xff0c;通过机器学习、深度学习等算法&#xff0c;实现对客流量的实时监测和统计。该算法主要基于机器学习和计算机视觉技术&#xff0c;其基本流程包括图像采集、图像预处理、目标检测、目标跟踪和客流量统计等步骤…

HTML快速上手

前腰&#xff1a;本文只是概括重要的 html 标签&#xff0c;这些标签的使用频率较高&#xff0c;更多标签相关的资源您可以跳转 Mmdn 进行深入的学习。 1.HTML 基础 就其核心而言&#xff0c;HTML 是一种相当简单的、由不同 元素 组成的标记语言&#xff0c;它可以被应用于文本…

一款基于Frida的Android- SO动态库逆向命令行工具

前言 YJ是一款基于Frida框架的款Native层逆向分析的交互式工具&#xff0c;就像在GUN-LINUX上使用GDB工具一样&#xff0c;设计YJ的灵感来自GNU-GDB调试工具&#xff0c;它通过交互命令模式轻松地向展示你想要窥探的内存数据 Frida是一个底层hook工具及框架。提供了hook工具的…

防火墙如何处理nat(私网用户访问Internet场景)

目录 私网用户访问Internet场景源NAT的两种转换方式NAT No-PAT NAPT配置思路规划 NAPT配置命令配置接口IP地址并将接口加入相应安全区域配置安全策略配置NAT地址池配置源NAT策略配置缺省路由配置黑洞路由 私网用户访问Internet场景 多个用户共享少量公网地址访问Internet的时候…

CAN记录仪在矿卡中的应用

CAN数据记录仪在矿卡中主要用于记录和监控车辆的运行数据&#xff0c;以保障安全和提高运营效率。那么就需要记录整车数据来进行车辆诊断分析&#xff0c;查找问题解决问题。 CAN数据记录仪可以记录矿卡的各种运行参数&#xff0c;如发动机转速、车速、制动状态、转向状态、油…

首届PolarDB开发者大会在京举办,阿里云李飞飞:云数据库加速迈向智能化

1月17日&#xff0c;阿里云PolarDB开发者大会在京举办&#xff0c;中国首款自研云原生数据库PolarDB发布“三层分离”新版本&#xff0c;基于智能决策实现查询性能10倍提升、节省50%成本。此外&#xff0c;阿里云全新推出数据库场景体验馆、训练营等系列新举措&#xff0c;广大…

Git项目分支管理规范

一、分支管理 创建项目时&#xff0c;会针对不同环境创建两个常设分支(也可以算主分支&#xff0c;永久不会删除) master&#xff1a;生产环境的稳定分支&#xff0c;生产环境基于该分支构建。仅用来发布新版本&#xff0c;除了从release测试分支或 hotfix-*Bug修复分支进行m…

redis数据安全(四)复制

关系数据库通常会使用一个主服务器向多个从服务器发送更新&#xff0c;并使用从服务器来处理所有读请求&#xff0c;Redis也采用了同样的方法来实现自己的复制特性&#xff0c;并将其用做扩展性能的一种手段。 一、特点&#xff1a; 1、异步复制&#xff1a;Redis默认使用的是…

Mysql 数据库DML 数据操作语言—— 对数据库表中的数据进行增删改

DML&#xff1a;数据操作语言&#xff0c;用来对数据库表中的数据进行增删改 前提&#xff0c;数据库里面有一张表&#xff0c;具体如何创建&#xff0c;请看上篇文章 1、增添数据 1.1、给指定字段增添数据 insert into tt4 (name,age) values (张三,18); 1.2、给全部字段添…

使用Markdown编辑器

这里写自定义目录标题 欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants 创建一个自定义列表如何创建一个…

k8s集群环境搭建以及插件安装

前置条件 终端工具MobaXterm很好用。 1、虚拟机三台&#xff08;ip按自己的网络环境相应配置&#xff09;(master/node) 节点ipk8s-master192.168.200.150k8s-node1192.168.200.151k8s-node2192.168.200.152 2、关闭防火墙(master/node) systemctl stop firewalld systemc…

【Linux 命令】tree 对目录进行树形展示

目录 1、tree 命令功能展示 2、tree 命令安装 3、tree 命令语法及其参数功能 4、终止 tree 展开树命令 1、tree 命令功能展示 在 Linux 中&#xff0c;我们使用 ll 命令对目录的展示并不太方便我们查看&#xff0c;不太清晰明了&#xff0c;所以我们可以使用 tree 命令以…

Dubbo核心功能解析

Dubbo核心功能讲解 Dubbo是一个精耕服务治理领域的框架&#xff0c;秉承了阿里一贯的大而全风格&#xff0c;和Eureka相比复杂度有不小的提高&#xff0c;这一节我们选了Registry和Remoting两个核心模块&#xff0c;从功能层面做个简单的了解(后面的章节会深入介绍底层原理) …

渗透测试之Mimikatz2.2 如何抓取Win11登录明文密码

环境: 1.攻击者IP:192.168.1.35 系统: KALI2022(vmware 16.0) 2.靶机IP:192.168.1.16 系统: Windows11 3.USB无线网卡 4.Mimikatz 2.2 (win版) 问题描述: Mimikatz2.2 如何抓取Win11登录明文密码 解决方案: Wdigest WDigest协议是在WindowsXP中被引入的,旨在与H…