前言
我们知道,位于 Serialize 层上面的是负责网络传输的 Transport 层,它负责调用编解码器 Codec2 把要传输的对象编码后传输、再对接收到的字节序列解码。
站在客户端的角度,一次 RPC 调用的流程大概是这样的:
- Invoker 发起 RPC 调用请求
- Exchange 层负责数据交换,实现 Request-Response 语义
- Transport 层调用编码器对 Request 编码后发送,主线程阻塞等待
- IO 线程读取到服务端响应的数据,解码器解码后得到结果,唤醒主线程
清楚这个流程之后,我们尝试把 Dubbo 默认用 Netty 实现的传输层替换成我们自己实现的。
特别声明:Netty 已经做的足够好了,我们这么做并没有什么意义,只是为了加深你对 Dubbo 传输层工作流程的理解。
自定义Transport
新建一个模块dubbo-extension-transport-javasocket
用来封装我们自己的传输层实现。
因为要写的是 Dubbo 传输层的一个实现策略,所以要依赖dubbo-remoting-api
<dependencies>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-remoting-api</artifactId>
<version>${dubbo.version}</version>
</dependency>
</dependencies>
Transporter
传输层的核心 SPI 接口是 org.apache.dubbo.remoting.Transporter,我们自己实现一个。
JavaSocketTransporter 的核心是:
- Dubbo 开启服务时创建 JavaSocketServer
- 客户端和服务端建立连接时创建 JavaSocketClient
public class JavaSocketTransporter implements Transporter {
@Override
public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
return new JavaSocketServer(url, handler);
}
@Override
public Client connect(URL url, ChannelHandler handler) throws RemotingException {
return new JavaSocketClient(url, handler);
}
}
Channel
Dubbo 抽象了 org.apache.dubbo.remoting.Channel 接口来表示一个 tcp 连接,我们用的 Java Socket 实现,对应的类是 java.nio.channels.SocketChannel。但是我们要写一个类来把 SocketChannel 适配成 Dubbo 的 Channel。
Java SocketChannel 并不支持维护属性,Dubbo Channel 是支持的,所以我们专门搞个 Map 记录一下。
public class JavaSocketChannel extends AbstractChannel {
private static final ConcurrentMap<SocketChannel, JavaSocketChannel> CHANNEL_MAP = new ConcurrentHashMap<>();
private final SocketChannel socketChannel;
private final Codec2 codec;
private final Map<String, Object> attributes = new ConcurrentHashMap<>();
private JavaSocketChannel(Codec2 codec, SocketChannel sc, URL url, ChannelHandler handler) {
super(url, handler);
this.codec = codec;
this.socketChannel = sc;
}
static JavaSocketChannel getOrAddChannel(Codec2 codec, SocketChannel sc, URL url, ChannelHandler handler) {
JavaSocketChannel javaSocketChannel = CHANNEL_MAP.get(sc);
if (javaSocketChannel == null) {
javaSocketChannel = new JavaSocketChannel(codec, sc, url, handler);
}
return javaSocketChannel;
}
@Override
public InetSocketAddress getRemoteAddress() {
try {
return (InetSocketAddress) socketChannel.getRemoteAddress();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean isConnected() {
return socketChannel.isConnected();
}
@Override
public boolean hasAttribute(String key) {
return attributes.containsKey(key);
}
@Override
public Object getAttribute(String key) {
return attributes.get(key);
}
@Override
public void setAttribute(String key, Object value) {
attributes.put(key, value);
}
@Override
public void removeAttribute(String key) {
attributes.remove(key);
}
@Override
public InetSocketAddress getLocalAddress() {
try {
return (InetSocketAddress) socketChannel.getLocalAddress();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
try {
ChannelBuffer channelBuffer = ChannelBuffers.directBuffer(1024);
codec.encode(this, channelBuffer, message);
socketChannel.write(channelBuffer.toByteBuffer());
} catch (Exception e) {
e.printStackTrace();
}
}
}
RemotingServer
再看服务端,Dubbo 提供了抽象类 org.apache.dubbo.remoting.transport.AbstractServer 实现了一些通用的逻辑,我们的 JavaSocketServer 直接继承它即可。
Dubbo 启动暴露服务时,会一并开启我们给定的 JavaSocketServer,方法是doOpen
JavaSocketServer 核心是:
- 通过 ServerSocketChannel 绑定本地端口来开启一个服务
- 处理客户端连接
- 网络IO数据读取、解码
public class JavaSocketServer extends AbstractServer {
private ServerSocketChannel serverSocketChannel;
private Map<SocketChannel, JavaSocketChannel> channelMap;
public JavaSocketServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
}
@Override
protected void doOpen() throws Throwable {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(getBindAddress());
serverSocketChannel.configureBlocking(false);
channelMap = new ConcurrentHashMap<>();
// 开启一个线程来读网络数据
new Thread(() -> {
try {
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
// 建立新连接,丢到Map
socketChannel.configureBlocking(false);
channelMap.put(socketChannel, JavaSocketChannel.getOrAddChannel(getCodec(), socketChannel, getUrl(), getChannelHandler()));
}
// 遍历所有连接,看看是否有数据可读
for (Map.Entry<SocketChannel, JavaSocketChannel> entry : channelMap.entrySet()) {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 10);
int length = entry.getKey().read(byteBuffer);
if (length > 0) {
byteBuffer.flip();
// 读到新数据,尝试解码,交给后续handler处理
Object decode = getCodec().decode(entry.getValue(), ChannelBuffers.wrappedBuffer(byteBuffer));
if (decode != null) {
getDelegateHandler().received(entry.getValue(), decode);
}
}
}
Thread.sleep(10);// sleep一会,避免空转
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
@Override
protected void doClose() throws Throwable {
if (serverSocketChannel != null) {
serverSocketChannel.close();
}
}
@Override
public boolean isBound() {
return serverSocketChannel.isOpen();
}
@Override
public Collection<Channel> getChannels() {
return null;
}
@Override
public Channel getChannel(InetSocketAddress remoteAddress) {
return null;
}
}
Client
最后是客户端,Dubbo 提供了抽象类 org.apache.dubbo.remoting.transport.AbstractClient 实现了一些通用逻辑,我们的 JavaSocketClient 也直接继承它即可。
JavaSocketClient 核心是:
- 和服务端建立连接
- Request 对象编码后发送
- 读取服务端响应的数据、解码
public class JavaSocketClient extends AbstractClient {
private SocketChannel socketChannel;
public JavaSocketClient(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
}
@Override
protected void doOpen() throws Throwable {
socketChannel = SocketChannel.open();
System.err.println("client open");
}
@Override
protected void doClose() throws Throwable {
}
@Override
protected void doConnect() throws Throwable {
System.err.println("client connet");
if (socketChannel.connect(getConnectAddress())) {
new Thread(() -> {
try {
while (true) {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 10);
int length = socketChannel.read(byteBuffer);
if (length > 0) {
byteBuffer.flip();
Object decode = getCodec().decode(getChannel(), ChannelBuffers.wrappedBuffer(byteBuffer));
getDelegateHandler().received(getChannel(), decode);
}
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
@Override
protected void doDisConnect() throws Throwable {
}
@Override
protected Channel getChannel() {
return JavaSocketChannel.getOrAddChannel(getCodec(), socketChannel, getUrl(), getChannelHandler());
}
@Override
public void send(Object message, boolean sent) throws RemotingException {
try {
ChannelBuffer channelBuffer = ChannelBuffers.directBuffer(1024);
getCodec().encode(getChannel(), channelBuffer, message);
socketChannel.write(channelBuffer.toByteBuffer());
} catch (Exception e) {
e.printStackTrace();
}
}
}
至此,自定义的传输层逻辑就写完了。接下来是让 Dubbo 加载并使用我们自定义的实现,可以通过 SPI 机制。
创建META-INF/dubbo/org.apache.dubbo.remoting.Transporter
文件,编写内容:
javasocket=dubbo.extension.remoting.transport.javasocket.JavaSocketTransporter
服务端可以在 ProtocolConfig 里指定:
ProtocolConfig protocolConfig = new ProtocolConfig("dubbo", 20880);
protocolConfig.setServer("javasocket");
客户端可以在 ReferenceConfig 里配置参数指定:
Map<String, String> parameters = new HashMap<>();
parameters.put("client", "javasocket");
ReferenceConfig.setParameters(parameters);