Netty 实现dubbo rpc

一、RPC 的基本介绍

  RPC (Remote Procedure Call) 远程过程调用,是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外的为这个交互编程。也就是说可以达到两个或者多个应用程序部署在不同的服务器上,他们之间的调用都像是本地方法调用一样。RPC 的调用如下图。

常用的RPC 框架有阿里的dubbo,Google的gRPC,Go 语言的rpcx,Apache的thrift,Spring的Spring Cloud.

若想了解dubbo与Spring Cloud的区别参考:SpringCloud 与 Dubbo 的区别,终于有人讲明白了...-腾讯云开发者社区-腾讯云

二、RPC 调用的过程

在RPC 中,Client 端叫做服务消费者,Server 叫做服务提供者。

调用流程说明

  • 服务消费方(client)以本地调用方式调用服务
  • client stub 接收到调用后负责将方法、参数等封装成能够进行网络传输的消息体
  • client stub 将消息进行编码并发送到服务端
  • server stub 接收到消息后进行解码
  • server stub 根据解码结果调用本地的服务
  • 本地服务执行并将结果返回给server stub
  • server stub 将返回导入结果进行编码并发送给消费方
  • client stub 接收到消息并进行解码
  • 服务消费方(client) 得到结果
  • 其中,RPC 框架的目标就是把2-8 这些步骤封装起来,用户无需关心这些细节,可以像调用本地方法一样即可完成远程服务调用。

三、dubbo RPC

1.需求说明

dubbo 底层使用了Netty 作为网络通信框架,要求用netty 实现一个简单的RPC框架。

模仿dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者的服务,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信给予Netty 4.x。

2.设计说明

创建一个接口,定义抽象方法。用于消费者和提供者之间的约定。

创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。

创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用netty请求提供者返回数据。 开发的分析图如下:

3.代码实现

netty用的包:4.1.20.Final。pom.xml如下:

<dependency>
   <groupId>io.netty</groupId>
   <artifactId>netty-all</artifactId>
   <version>4.1.20.Final</version>
</dependency>

1)公共接口

/**
 * @author: fqtang
 * @date: 2024/05/05/21:51
 * @description: 服务提供方和服务消费方都需要
 */
public interface HelloService {

	String say(String mes);
}

2)公共接口实现类

import org.springframework.util.StringUtils;
import com.tfq.netty.dubborpc.publicinterface.HelloService;

/**
 * @author: fqtang
 * @date: 2024/05/05/21:53
 * @description: 描述
 */
public class HelloServiceImpl implements HelloService {

	private static int count = 0;

	/**
	 * 当有消费方调用该方法时就返回一个结果
	 *
	 * @param mes 传入消息
	 * @return 返回结果
	 */
	@Override
	public String say(String mes) {
		System.out.println("收到客户端消息=" + mes);
		if(StringUtils.isEmpty(mes)) {
			return "你好客户端,我已经收到你的消息 ";
		}else{
			return "你好客户端,我已经收到你的消息:【" + mes+"】,第 "+(++count)+"次。";
		}
	}
}

3)服务提供者

import com.tfq.netty.dubborpc.netty.NettyServer;

/**
 * @author: fqtang
 * @date: 2024/05/05/21:57
 * @description: 启动服务提供者,就是NettyServer
 */
public class ServerBootstrap {

	public static void main(String[] args) {

		String hostName="127.0.0.1";
		int port = 8001;
		NettyServer.startServer(hostName,port);
	}

}



import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * @author: fqtang
 * @date: 2024/05/05/21:59
 * @description: 描述
 */
public class NettyServer {

	public static void startServer(String hostName,int port){
		startServer0(hostName,port);
	}

	/**
	 * 编写一个方法,完成对Netty Server的初始化工作和启动
	 * @param hostName
	 * @param port
	 */
	private static void startServer0(String hostName,int port){
		EventLoopGroup bossGroup = new NioEventLoopGroup(1);
		EventLoopGroup workerGroup = new NioEventLoopGroup();

		try{
			ServerBootstrap serverBootstrap = new ServerBootstrap();

			serverBootstrap.group(bossGroup,workerGroup)
				.channel(NioServerSocketChannel.class)
				.childHandler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel ch) throws Exception {
						ChannelPipeline pipeline = ch.pipeline();
						pipeline.addLast(new StringDecoder());
						pipeline.addLast(new StringEncoder());
						pipeline.addLast(new NettyServerHandler());
					}
				});

			ChannelFuture channelFuture = serverBootstrap.bind(hostName,port).sync();
			System.out.println("服务提供方开始提供服务~~~");
			channelFuture.channel().closeFuture().sync();
		}catch(Exception e){
			e.printStackTrace();
		}finally {
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}

	}
}



import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import com.tfq.netty.dubborpc.consumer.ClientBootstrap;
import com.tfq.netty.dubborpc.provider.HelloServiceImpl;

/**
 * @author: fqtang
 * @date: 2024/05/05/22:03
 * @description: 描述
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		//获取客户端调用的消息,并调用服务
		System.out.println("msg = " + msg);
		//客户端在调用服务器的时候,需要定义一个协议。比如我们要求每次发消息时,都必须以某个字符器开头
		//比如:dubboserver#hello#xxxx
		if(msg.toString().startsWith(ClientBootstrap.ProtocolHeader)) {
			String res = new HelloServiceImpl().say(msg.toString()
				.substring(msg.toString()
					.lastIndexOf("#") + 1));
			ctx.writeAndFlush(res);
		}
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.close();
	}
}


4)消费者

import com.tfq.netty.dubborpc.netty.NettyClient;
import com.tfq.netty.dubborpc.publicinterface.HelloService;

/**
 * @author: fqtang
 * @date: 2024/05/05/23:26
 * @description: 消费者
 */
public class ClientBootstrap {

	/**
	 * 这里定义协议头
	 */
	public static final String ProtocolHeader = "dubboserver#say#";

	public static void main(String[] args) throws InterruptedException {
		//创建一个消费者
		NettyClient customer = new NettyClient();
		//创建代理对象
		HelloService helloService = (HelloService) customer.getBean(HelloService.class, ProtocolHeader);
		while(true) {
			Thread.sleep(10 * 1000);
			//通过代理对象调用提供者的方法(服务)
			String res = helloService.say("你好 dubbo~");
			System.out.println("调用的结果 res = " + res);
		}
	}
}



import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * @author: fqtang
 * @date: 2024/05/05/23:04
 * @description: 描述
 */
public class NettyClient {

	//创建一个线程池
	private static ExecutorService executorService = Executors.newFixedThreadPool(2);

	private static NettyClientHandler clientHandler;

	/**
	 * 编写方法使用代理模式,获取一个代理对象
	 * @param serviceClass
	 * @param protocolHeader
	 * @return
	 */
	public Object getBean(final Class<?> serviceClass, final String protocolHeader) {

		return Proxy.newProxyInstance(Thread.currentThread()
				.getContextClassLoader(),
			new Class<?>[]{serviceClass}, (proxy, method, args) -> {
				if(clientHandler == null) {
					initClient("127.0.0.1", 8001);
				}
				//设置要发送给服务器端的信息,protocolHeader为协议头[dubboserver#hello#],
				//args[0] 就是客户端调用api say(???),参数
				clientHandler.setParam(protocolHeader + args[0]);
				return executorService.submit(clientHandler).get();
			});
	}

	private static void initClient(String hostName, int port) {
		EventLoopGroup worker = new NioEventLoopGroup();
		try {
			clientHandler = new NettyClientHandler();
			Bootstrap bootstrap = new Bootstrap();
			bootstrap.group(worker)
				.channel(NioSocketChannel.class)
				.option(ChannelOption.TCP_NODELAY, true)
				.handler(new ChannelInitializer<SocketChannel>() {
					@Override
					protected void initChannel(SocketChannel ch) throws Exception {
						ChannelPipeline channelPipeline = ch.pipeline();
						channelPipeline.addLast(new StringDecoder());
						channelPipeline.addLast(new StringEncoder());
						channelPipeline.addLast(clientHandler);
					}
				});

			ChannelFuture channelFuture = bootstrap.connect(hostName, port)
				.sync();
			/*channelFuture.channel()
				.closeFuture()
				.sync();*/
		} catch(InterruptedException e) {
			e.printStackTrace();
		} /*finally {
			worker.shutdownGracefully();
		}*/
	}
}



package com.tfq.netty.dubborpc.netty;

import java.util.concurrent.Callable;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @author: fqtang
 * @date: 2024/05/05/22:48
 * @description: 描述
 */
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {

	private ChannelHandlerContext context;
	/**
	 * 返回的结果
	 */
	private String result;
	/**
	 * 客户端调用方法返回的参数
	 */
	private String param;

	/**
	 * 与服务器的连接创建后,就会被调用,这个方法被第一个,调用(1)
	 * @param ctx
	 * @throws Exception
	 */
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		//因为在其他方法会使用到这个ctx
		context = ctx;
		System.out.println("调用(1) channelActive--->连接到服务器");
	}

	/**
	 *  被调用(4)
	 * 收到服务器的数据后,调用方法
	 * @param ctx
	 * @param msg
	 * @throws Exception
	 */
	@Override
	public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		result = (String) msg;
		System.out.println("调用(4)channelRead--->从服务器读取到数据:"+result);
		//唤醒等待的线程
		notify();
		System.out.println("调用(4)channelRead---notify()---->从服务器读取到数据后唤醒线程.....");
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.close();
	}

	/**
	 * 被调用(3), 被调用(5)
	 * 被代理对象调用,发送数据给服务器,--->wait ---> 等待被唤醒 --->返回结果
	 * @return
	 * @throws Exception
	 */
	@Override
	public synchronized Object call() throws Exception {
		context.writeAndFlush(param);
		System.out.println("调用(3) call()--->被代理对象调用,发送数据给服务器.....");
		//进行wait,等待channelRead 方法获取到服务器的结果后,唤醒
		wait();
		System.out.println("调用(5) call()--->wait() 等待channelRead 方法获取到服务器的结果后.....");
		return result;
	}

	/**
	 * 被调用(2)
	 * @param param
	 */
	void setParam(String param){
		System.out.println("调用(2) setParam()--->被代理对象调用,发送数据给服务器.....");
		this.param = param;
	}
}

若有问题请留言。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/595337.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

OpenCV 入门(七)—— 身份证识别

OpenCV 入门系列&#xff1a; OpenCV 入门&#xff08;一&#xff09;—— OpenCV 基础 OpenCV 入门&#xff08;二&#xff09;—— 车牌定位 OpenCV 入门&#xff08;三&#xff09;—— 车牌筛选 OpenCV 入门&#xff08;四&#xff09;—— 车牌号识别 OpenCV 入门&#xf…

德国韦纳WENAROLL滚压刀,液压缸,滚光刀,挤压刀,滚轧刀

德国韦纳WENAROLL滚压刀,液压缸&#xff0c;滚光刀,挤压刀&#xff0c;滚轧刀&#xff08;百度一下&#xff0c;西安尚融&#xff09; 德国韦纳&#xff08;WENAROLL&#xff09;的滚压刀、液压缸、滚光刀、挤压刀和滚轧刀在工业领域享有很高的声誉&#xff0c;这些产品因其高…

SM618卡件SM480模块和利时

SM618卡件❗电:183-6998-1851❗SM480模块和利时。自动化程度的提高&#xff0c;I/O点数大幅增 加&#xff0c;传统单一配线的方式已经无法满足发展的需 要SM618卡件SM480模块和利时。&#xff0e;对简单、可靠的配线方式的需求日益强烈&#xff0e; 传统接线 - 以并联方式连 接…

C# WinForm —— 12 ListBox绑定数据

ListBox加载大量数据时&#xff0c;避免窗体闪烁的方法&#xff1a; 在加载语句的前后分别加上 BeginUpdate()方法 和 EndUpdate()方法 指定一个集合为绑定的数据源 1. 首先&#xff0c;右键项目&#xff0c;添加类 2. 在新建的类文件中添加属性值信息 3. 构建初始化的对象…

访问学者在外访学期间,是否可以中途回国?

在全球化的今天&#xff0c;访问学者制度已成为促进国际学术交流与合作的重要桥梁。然而&#xff0c;对于许多国外访问学者来说&#xff0c;一个常见的问题是&#xff1a;在访学期间&#xff0c;我是否可以中途回国&#xff1f;这个问题涉及到多个方面&#xff0c;包括政策法规…

7步教程从零开始搭建跨境电商平台开发

跨境电商平台开发一直是创业者们追逐的热门领域之一。本文将为您提供一个7步教程&#xff0c;帮助您从零开始搭建跨境电商平台&#xff0c;让您在这个充满机遇的领域中抢占先机。 步骤一&#xff1a;市场调研和定位 在开始搭建跨境电商平台之前&#xff0c;第一步是进行充分的…

大数据与会计专业主要学什么课程

大数据与会计专业是一个结合了传统会计知识与现代大数据技术的交叉学科&#xff0c;旨在培养既懂会计又熟悉大数据分析的复合型人才。该专业的学生将会学习以下主要课程内容&#xff1a; 会计基础课程&#xff1a;包括基础会计、财务会计、成本会计、管理会计等&#xff0c;这些…

我独自升级崛起下载教程 我独自升级崛起怎么一键下载

定于5月8日全球盛大发布的动作RPG力作《我独自升级崛起》&#xff0c;基于备受追捧的同名动画及网络漫画&#xff0c;誓为热情洋溢的游戏爱好者们呈献一场深度与广度兼具的冒险盛宴。这款游戏巧妙融合网络武侠元素&#xff0c;其创意十足的设计框架下&#xff0c;核心叙述聚焦于…

OSPF综合实验(超详细易懂)(HCIP)

1、拓扑信息 2、需求分析 3、IP规划 4、配置 5、测试 1、拓扑信息 2、需求分析 R4为ISP&#xff0c;其上只能配置I地址&#xff1b; R4与其他所有直连设备间均使用公有IP 公网中使用的是点到…

外贸大客户开发的三大困境

外贸大客户开发的三大困境&#xff0c;第一个是进不来&#xff0c;什么叫进不来呢&#xff1f;就是客户&#xff0c;大客户他不仅能够为企业带来大额的业绩&#xff0c;而且利润也高&#xff0c;那么也对于这种品牌也有一定的关联&#xff0c;还能为企业带来更多的一些资源&…

Python测试框架Pytest的参数化详解

上篇博文介绍过&#xff0c;Pytest是目前比较成熟功能齐全的测试框架&#xff0c;使用率肯定也不断攀升。 在实际工作中&#xff0c;许多测试用例都是类似的重复&#xff0c;一个个写最后代码会显得很冗余。这里&#xff0c;我们来了解一下pytest.mark.parametrize装饰器&…

karateclub,一个超酷的 Python 库!

更多资料获取 &#x1f4da; 个人网站&#xff1a;ipengtao.com 大家好&#xff0c;今天为大家分享一个超酷的 Python 库 - karateclub。 Github地址&#xff1a;https://github.com/benedekrozemberczki/karateclub Python karateclub是一个用于图嵌入和图聚类的库&#xff…

git commit 提交报错pre-commit hook failed (add --no-verify to bypass) 解决方法,亲测有效

问题截图 今天在执行 git commit 命令时报错&#xff1a;pre-commit hook failed (add --no-verify to bypass) 解决 参考文章&#xff1a;git commit报错&#xff1a;pre-commit hook failed的解决方法 具体原理什么的就不解释了&#xff0c;可以看看上面的参考文章 解决方…

如何使用高德地图的 Loca 展示 gpx 文件的 3D 路径,Loca.LineLayer

如何使用高德地图的 Loca 展示 gpx 文件的 3D 路径&#xff0c;Loca.LineLayer 找寻了好久&#xff0c;终于将这个展示 3D 路径的功能实现了。 在线实例&#xff1a; http://kylebing.cn/tools/map/#/gpx/gpx-viewer-3d 这里是用于展示 gpx 路径&#xff0c;关于 gpx 的相关知…

Linux migrate_type进一步探索

文章接着上回Linux migrate_type初步探索 1、物理页面添加到buddy系统 我们都知道物理内存一开始是由memblock进行分配管理&#xff0c;后面会切换到buddy系统管理。那么接下来我们看一下&#xff0c;memblock管理的物理页面是怎么添加到buddy系统中的。 start_kernel() -&g…

液晶数显式液压万能试验机WES-300B

一、简介 主机为两立柱、两丝杠、油缸下置式&#xff0c;拉伸空间位于主机的上方&#xff0c;压缩、弯曲试验空间位于主机下横梁和工作台之间。测力仪表采用高清液晶显示屏&#xff0c;实验数据方便直观。 主要性能技术指标 最大试验力&#xff08;kN&#xff09; 300 试…

文件删了,回收站清空了怎么恢复?文件恢复软件一览

在日常生活和工作中&#xff0c;我们常常会遇到误删除文件的情况&#xff0c;有时甚至会因为清空了回收站而无法找回这些文件。这些文件可能包含重要的工作数据、个人照片或其他珍贵的回忆。那么&#xff0c;在这种情况下&#xff0c;我们该如何恢复这些被删除且清空回收站的文…

外婆传(封家香传)

余乃民国三十载&#xff08;公元一千九百四十一&#xff09;九月初九重阳佳节日出生于衡阳县长塘村封谷里。父封盖梅&#xff0c;在民国二十九年&#xff08;公元一千九百四十&#xff09;驾鹤西归&#xff0c;遗世独立&#xff0c;吾未能见其颜。母氏&#xff0c;因丧夫之痛&a…

C++ | Leetcode C++题解之第59题螺旋矩阵II

题目&#xff1a; 题解&#xff1a; class Solution { public:vector<vector<int>> generateMatrix(int n) {int num 1;vector<vector<int>> matrix(n, vector<int>(n));int left 0, right n - 1, top 0, bottom n - 1;while (left < r…

微信公众号排名 SEO的5个策略

随着微信公众号在社交媒体领域的持续发展和普及&#xff0c;如何提升公众号的搜索排名&#xff0c;成为许多运营者关注的焦点。公众号排名SEO&#xff0c;即针对微信公众号进行搜索引擎优化&#xff0c;旨在提高公众号在搜索结果中的曝光率和点击率。下面&#xff0c;我们将深入…