Flink RPC初探

1.RPC概述

  RPC( Remote Procedure Call ) 的主要功能目标是让构建分布式计算(应用)更容易,在提供强大的远程调用能力时不损失本地调用的语义简洁性。 为实现该目标,RPC 框架需提供一种透明调用机制让使用者不必显式的区分本地调用和远程调用。 总而言之: RPC是为了解决分布式系统中,各个服务中的调用问题,在进行远程调用时,也像本地调用一样方便,让调用者感知不到远程调用的逻辑

2.RPC的调用分类

RPC的调用主要为两种:

  • 同步调用
    • 客户端等待调用执行完成并返回结果。
  • 异步调用
    • 客户端调用后不用等待结果返回,但是依然可以通过回调通知等方式获取返回结果。若客户端不关心调用返回结果,则变成单向异步调用,单向调用不用返回结果。

3.RPC流程图解

  当 user 想发起一个远程调用时,它实际是通过本地调用user-stub。user-stub 负责将调用的接口、方法和参数通过约定的协议规范进行编码并通过本地的 RPCRuntime 实例传输到远端的实例。远端 RPCRuntime 实例收到请求后交给 server-stub 进行解码后发起本地端调用,调用结果再返回给 user 端。
在这里插入图片描述

以计算器Calculator为例,如果实现类CalculatorImpl是放在本地的,那么直接调用即可:

在这里插入图片描述
现在系统变成分布式了,CalculatorImpl和调用方不在同一个地址空间,那么就必须要进行远程过程调用:

在这里插入图片描述
如何实现远程过程调用(RPC)?一个完整的RPC流程,可以用下面这张图来描述:

在这里插入图片描述
其中左边的Client,对应的就是前面的Service A,而右边的Server,对应的则是Service B。 下面详解流程:

  • Service A的应用层代码中,调用了Calculator的一个实现类的add方法,目标是执行加法运算;
  • Calculator实现类,内部并不是直接实现计算器的加减乘除逻辑,而是通过远程调用Service B的RPC接口,来获取运算结果,因此称之为Stub
  • 远程通信工具(图中的Run-time Library)实现Stub与Service B之间的通信,比如Java的Socket,,当然也可以用基于Http协议的HttpClient,或者其他通讯工具类,都可以,RPC并没有规定说要用何种协议进行通信
  • Stub通过调用调用远程通信工具提供的方法与Service B 建立连接,然后将请求数据发送给Service B。数据传输格式底层为二进制格式,例如 calculator.add(1,2),必须把参数值1和2封装到一个Request对象(包含数据以及其他服务调用对应RPC接口的信息),然后序列化为二进制格式,再传给通信工具类;
  • Service B 接收到Service A中通信工具传递过来的数据后,通过自己的通信工具接收二进制数据请求;
  • Service B中的Stub对二进制数据进行反序列化为请求对象;
  • Service B中通过反射去获取 Calculator的实际实现类去执行add方法;
  • RPC接口处理完成,返回执行结果。即为Service B如何将数据传送给 Service A ?
    • Service B 序列化结果–> Service A 通信工具解析请求–> Service A Stub反序列化请求–>结果返回Service A中的Application

4.RPC Demo

4.1 client(客户端)

  • 客户端发起RPC请求,ComsumerApp

    public class ComsumerApp {
        private static Logger log = LoggerFactory.getLogger(ComsumerApp.class);
    
        public static void main(String[] args) {
            Calculator calculator = new CalculatorRemoteImpl();
            int result = calculator.add(1, 2);
            log.info("result is {}", result);
        }
    }
    
  • 把RPC的逻辑封装到CalculatorRemoteImpl类中,客户端调用时感知不到远程调用的麻烦。 CalculatorRemoteImpl:

public class CalculatorRemoteImpl implements Calculator {
    public static final int PORT = 9090;
    private static Logger log = LoggerFactory.getLogger(CalculatorRemoteImpl.class);

    public int add(int a, int b) {
        //在分布式系统中,一个服务可能有多个实例,比如Service B,可能有ip地址为198.168.1.11和198.168.1.13两个实例,lookupProviders是在寻找要调用服务的实例列表。在分布式应用下,通常会有一个服务注册中心,来提供查询实例列表的功能。
        List<String> addressList = lookupProviders("Calculator.add");
        //查到实例列表之后,chooseTarget是要选择调用哪一个实例,其实内部就是一个负载均衡
        String address = chooseTarget(addressList);
        //实现一个简单的RPC,所以暂时不考虑服务注册中心和负载均衡,因此代码里返回ip地址为127.0.0.1。
        try {
            Socket socket = new Socket(address, PORT);//Socket通信

            // 将请求序列化
            CalculateRpcRequest calculateRpcRequest = generateRequest(a, b);
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());

            // 将请求发给服务提供方
            objectOutputStream.writeObject(calculateRpcRequest);

            // 将响应体反序列化
            ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
            Object response = objectInputStream.readObject();

            log.info("response is {}", response);

            if (response instanceof Integer) {
                return (Integer) response;
            } else {
                throw new InternalError();
            }

        } catch (Exception e) {
            log.error("fail", e);
            throw new InternalError();
        }
    }

    private CalculateRpcRequest generateRequest(int a, int b) {
        CalculateRpcRequest calculateRpcRequest = new CalculateRpcRequest();
        calculateRpcRequest.setA(a);
        calculateRpcRequest.setB(b);
        calculateRpcRequest.setMethod("add");
        return calculateRpcRequest;
    }

    private String chooseTarget(List<String> providers) {
        if (null == providers || providers.size() == 0) {
            throw new IllegalArgumentException();
        }
        return providers.get(0);
    }

    public static List<String> lookupProviders(String name) {
        List<String> strings = new ArrayList();
        strings.add("127.0.0.1");
        return strings;
    }
}

4.2 server(服务端)

  • 服务端是接收到请求,响应需求, ProviderApp:

    public class ProviderApp {
        private static Logger log = LoggerFactory.getLogger(ProviderApp.class);
    
        private Calculator calculator = new CalculatorImpl();
    
        public static void main(String[] args) throws IOException {
            new ProviderApp().run();
        }
    
        private void run() throws IOException {
            ServerSocket listener = new ServerSocket(9090);
            try {
                while (true) {
                    Socket socket = listener.accept();
                    try {
                        // 将请求反序列化
                        ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
                        Object object = objectInputStream.readObject();
    
                        log.info("request is {}", object);
    
                        // 调用服务
                        int result = 0;
                        if (object instanceof CalculateRpcRequest) {
                            CalculateRpcRequest calculateRpcRequest = (CalculateRpcRequest) object;
                            if ("add".equals(calculateRpcRequest.getMethod())) {
                                result = calculator.add(calculateRpcRequest.getA(), calculateRpcRequest.getB());
                            } else {
                                throw new UnsupportedOperationException();
                            }
                        }
    
                        // 返回结果
                        ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
                        objectOutputStream.writeObject(new Integer(result));
                    } catch (Exception e) {
                        log.error("fail", e);
                    } finally {
                        socket.close();
                    }
                }
            } finally {
                listener.close();
            }
        }
    }
    

    Server端主要是通过ServerSocket的accept方法,来接收Client端的请求,接着就是反序列化请求->执行->序列化执行结果,最后将二进制格式的执行结果返回给Client。

5.Flink RPC

5.1 概述

  在学习Flink RPC框架前,总结一哈常见大数据组件的RPC实现:

技术组件RPC实现
HDFSNetty
HBaseHBase-2.x 以前:NIO + ProtoBuf HBase-2.x 以后:Netty
ZooKeeperBIO(集群启动选举) + NIO(服务端处理客户端请求3.4) + Netty(3.6)
SparkSpark-1.x 基于 Akka Spark-2.x 基于 Netty
FlinkAkka + Netty

总言之:Flink的RPC实现:基于Scala的网络编程库:Akka。

5.2 前置知识

  • ActorSystem(重量级的系统对象) 是管理 Actor生命周期的组件, Actor是负责进行通信的组件。
  • 每个 Actor 都有一个 MailBox,其他 Actor 发送给它的消息首先储存在 MailBox 中,通过这种方式可以实现异步通信。
  • 每个 Actor 是单线程处理方式,不断从 MailBox 拉取消息执行处理,所以对于 Actor 的消息处理,不适合调用会阻塞的处理方法。
  • 每个ActorSystem 和 Actor在启动时会给定一个 name,如果要从ActorSystem中获取一 个 Actor,则通过以下的方式来进行 Actor的获取:akka.tcp://asname@bigdata02:9527/user/actorname 。
  • 如果一个Actor和另外一个 Actor进行通信,则必须先获取对方 Actor 的 ActorRef 对象,然后通过该对象发送消息即可。
  • 通过 tell 发送异步消息,不接收响应,通过 ask 发送异步消息,得到 Future 返回,通过异步返回处理结果。

5.3 Flink RPC详解

  Flink 中 RPC 的框架设计的主要类:
在这里插入图片描述

  Flink 中的 RPC 实现主要在 flink-runtime 模块下的 org.apache.flink.runtime.rpc 包中,涉及到的最重要的 API 主要是以下这四个: RpcGatewayRpcServerRpcEndpoint 以及 RpcService

5.3.1 RpcGateway

  Flink的RPC协议通过RpcGateway来定义,主要定义通信行为;用于远程调用RpcEndpoint的某些方法,可以理解为客服端代理。

/**
 * Rpc gateway interface which has to be implemented by Rpc gateways.
 */
public interface RpcGateway {

	/**
	 * Returns the fully qualified address under which the associated rpc endpoint is reachable.
	 *
	 * @return Fully qualified (RPC) address under which the associated rpc endpoint is reachable
	 */
	String getAddress();

	/**
	 * Returns the fully qualified hostname under which the associated rpc endpoint is reachable.
	 *
	 * @return Fully qualified hostname under which the associated rpc endpoint is reachable
	 */
	String getHostname();
}

5.3.2 RpcServer

  RpcServer负责接收响应远端RPC消息请求。有两个实现:AkkaInvocationHandler和FencedAkkaInvocationHandler。RpcServer 是 Actor 与 RpcEndpoint 两层之间的粘合层。

/**
 * Interface for self gateways.
 */
public interface RpcServer extends StartStoppable, MainThreadExecutable, RpcGateway {

	/**
	 * Return a future which is completed when the rpc endpoint has been terminated.
	 *
	 * @return Future indicating when the rpc endpoint has been terminated
	 */
	CompletableFuture<Void> getTerminationFuture();
}
5.3.3 RpcEndPoint

  RpcEndpoint是通信终端,提供RPC服务组件的生命周期管理(start、stop)。每个RpcEndpoint对应了一个路径(endpointId和actorSystem共同确定),每个路径对应一个Actor,其实现了RpcGateway接口,其构造函数如下:

protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
    	// 保存rpcService和endpointId
		this.rpcService = checkNotNull(rpcService, "rpcService");
		this.endpointId = checkNotNull(endpointId, "endpointId");
		/*
		 *  注释:通过RpcService启动 ResourceManager的RPCServer服务
		 *  此处启动的是 ResourceManager 的 RPC 服务端,在接收TaskManager启动完成信息之后,进行注册和心跳,来汇报Taskmanager的资源情况。
		 *  通过动态代理的形式构建了一个Server。
		 */
		this.rpcServer = rpcService.startServer(this);
		// 主线程执行器,所有调用在主线程中串行执行
		this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
	}

  RpcEndpoint 下面有四个比较重要的子类: TaskExecutor、Dispatcher、JobMaster、ResourceManager ;当在任意地方要创建这四个组件中任何一个组件的实例对象时,创建成功之后,都会要去执行其中的 onStart()方法 ,在集群启动的源码分析中,其实这些组件很多的工作流程,都被放在 onStart() 里面。

5.3.4 RpcService

  RpcService 是Rpc服务的接口,其主要作用如下:

  • 根据提供的RpcEndpoint来启动和停止RpcServer(Actor);
  • 根据提供的地址连接到RpcServer,并返回一个RpcGateway;
  • 延迟/立刻调度Runnable、Callable;

6.总结

  • RPC是为了解决分布式系统中,不同服务之间的调用问题,让远程调用也如同本地调用一样丝滑。
  • RpcGateway 是所有 RPC 的祖宗,各种RPC组件均是RpcGateway的子类。
  • RpcEndpoint是业务载体,对应Actor的封装。
  • RpcService 是 Rpc服务的接口,对应ActorSystem的封装。
  • RpcServer 是 RpcService 与 RpcEndpoint 之间的粘合层。
  • RpcEndpoint 下面有四个比较重要的子类: TaskExecutor、Dispatcher、JobMaster、ResourceManager,且当实例化其中的一个组件对象成功后会执行对应的onStart()方法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/497367.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

理想汽车推出首个全自研大模型Mind GPT,通过国家备案正式上线

理想汽车在今日宣布&#xff0c;其全自研的多模态认知大模型——Mind GPT&#xff0c;已正式通过国家《生成式人工智能服务管理暂行办法》的备案&#xff0c;并成功上线&#xff0c;标志着理想成为首个拥有自研大模型并通过国家备案的汽车厂商。 理想Mind GPT是汽车行业首个专门…

HTTP——Cookie

HTTP——Cookie 什么是Cookie通过Cookie访问网站 我们之前了解了HTTP协议&#xff0c;如果还有小伙伴还不清楚HTTP协议&#xff0c;可以点击这里&#xff1a; https://blog.csdn.net/qq_67693066/article/details/136895597 我们今天来稍微了解一下HTTP里面一个很小的部分&…

报表控件Stimulsoft Reports、Dashboards 和 Forms 新版v2024.2发布!

我们很高兴地宣布发布用于创建报告、仪表板和表单的最新版本的 Stimulsoft 产品 - 2024.2&#xff01;在此更新中&#xff0c;您将找到适用于 Python 应用程序和服务的产品、新的仪表板元素、我们的组件与 .NET 8.0 的兼容性、仪表板交互性的增强功能等等。 Stimulsoft Ultima…

通过WSL在阿里云上部署Vue项目

参考&#xff1a; 阿里云上搭建网站-CSDN博客 云服务器重装 关闭当前运行实例 更换操作系统&#xff0c;还有其他的进入方式。 选择ubuntu系统&#xff08;和WSL使用相同的系统&#xff09;。 设置用户和密码。发送短信验证码。 新系统更新。秒速干净的新系统设置完成。 这…

Ansible-1

Ansible是一款自动化运维、批量管理服务器的工具&#xff0c;批量系统配置、程序部署、运行命令等功能。基于Python开发&#xff0c;基于ssh进行管理&#xff0c;不需要在被管理端安装任何软件。Ansible在管理远程主机的时候&#xff0c;只有是通过各种模块进行操作的。 需要关…

华润电力2024届校招人才测评通知润择认知能力测评考什么?

华润电力2024届校招人才测评通知润择认知能力测评考什么&#xff1f; 第一部分&#xff0c;认知测评润择-认知能力测评 您好! 本次测评包含逻辑推理、数字推理、语言理解三大类型的问卷。共计58题。 测评限时60分钟。其中逻辑推理、数字推理、语言推理分别限时20分钟。 如逾时…

Dubbo管理平台安装部署

访问dubbo-admin访问报404 源码自行到githup下载&#xff0c;在idea打开&#xff0c;然后在控制台使用命令打包&#xff0c;然后拖拽到tomcat的webapps目录 #打包命令mvn package -Dmaven.skip.testtrue 修改tomcat端口 修改dubbo-admin 的zookeeper地址

【黑马头条】-day04自媒体文章审核-阿里云接口-敏感词分析DFA-图像识别OCR-异步调用MQ

文章目录 day4学习内容自媒体文章自动审核今日内容 1 自媒体文章自动审核1.1 审核流程1.2 内容安全第三方接口1.3 引入阿里云内容安全接口1.3.1 添加依赖1.3.2 导入aliyun模块1.3.3 注入Bean测试 2 app端文章保存接口2.1 表结构说明2.2 分布式id2.2.1 分布式id-技术选型2.2.2 雪…

uniApp使用XR-Frame创建3D场景(6)播放模型动画

上篇文章讲述了如何将XR-Frame作为子组件集成到uniApp中使用 这篇我们讲解播放模型动画 先看源码 <xr-scene render-system"alpha:true" bind:ready"handleReady"> <xr-node visible"{{sec6}}"><xr-light type"ambient&qu…

uniapp vue-cli项目配置devServer和outPutDir

上一次说了使用vue-cli创建uni-app项目&#xff0c;然后使用第三方工具开发打包&#xff0c;利用jekins发布等。 这一次我想解决以下问题&#xff1a; 1我想配置devServer供开发环境使用&#xff0c;这样也可以解决开发环境的跨域问题 2以前我的每个版本信息都配置在package…

Pycharm服务器配置python解释器并结合内网穿透实现公网远程开发

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

Excel:使用VLOOKUP函数,抓取指定数据,后一个列

Excel:使用VLOOKUP函数&#xff0c;抓取指定数据&#xff0c;后一个列 我们有这样一个数据源 要是实现这个页面的赋值 就是对应关系映射 使用 VLOOKUP(A2,Sheet2!$A$2:$B$9,2,FALSE)第一个参数是需要匹配的单元格。 第二个参数是数据源&#xff0c;我这里数据源用的是shee…

没学数模电可以玩单片机吗?

我们首先来看一下数电模电在单片机中的应用。数电知识在单片机中主要解决各种数字信号的处理、运算&#xff0c;如数制转换、数据运算等。模电知识在单片机中主要解决各种模拟信号的处理问题&#xff0c;如采集光照强度、声音的分贝、温度等模拟信号。而数电、模电的相互转换就…

单片机之串口通信

目录 串口介绍 通信的基本概念 并行通信和串行通信 同步通信和异步通信 串行异步通信方式 串行同步通信方式 通信协议 单片机常见通信接口 串行通信三种模式 串口参数 传输速度 ​串口的连接 电平标准 串行口的组成 串口数据缓冲寄存器 串行口控制寄存器 串口…

【IntelliJ IDEA】运行测试报错解决方案(附图)

IntelliJ IDEA 版本 2023.3.4 (Ultimate Edition) 测试报错信息 命令行过长。 通过 JAR 清单或通过类路径文件缩短命令行&#xff0c;然后重新运行 解决方案 修改运行配置&#xff0c;里面如果没有缩短命令行&#xff0c;需要再修改选项里面勾选缩短命令行让其显示&#x…

python基础 | 核心库:PIL

1、读取图像信息 查看图像信息 读取同一文件夹下的文件 可加 ./可不加 rom PIL import Image img Image.open(image.jpg) # 打开图像文件(注意:是去掉文件头的纯数据) print(img.format) # 图像格式(如BMP PNG JPEG 等) print(img.size) # 图像大小(…

灵动翻译音频文件字幕提取及翻译;剪映视频添加字幕

参考&#xff1a;视频音频下载工具 https://tuberipper.com/21/save/mp3 1、灵动翻译音频文件字幕提取及翻译 灵动翻译可以直接chorme浏览器插件安装&#xff1a; 点击使用&#xff0c;可以上传音频文件 上传后自动翻译&#xff0c;然后点击译文即可翻译成中文&#xff0c;…

【VMware Workstation】公司所有主机和虚拟机ip互通,以及虚拟机目录迁移

文章目录 1、场景2、环境3、实战3.1、所有主机和虚拟机ip互通Stage 1 : 【虚拟机】设置为桥接模式Stage 2 : 【虚拟机】设置ipStage 3 : 【路由器】ARP 静态绑定MACStage 3-1 ping 路由器 ipStage 3-2 【静态绑定】虚拟机查看mac地址Stage 3-3 【静态绑定】路由器ARP 静态绑定 …

【QT+QGIS跨平台编译】040:【geos_c+Qt跨平台编译】(一套代码、一套框架,跨平台编译)

点击查看专栏目录 文章目录 一、geos_c介绍二、文件下载三、文件分析四、pro文件五、编译实践一、geos_c介绍 GEOS_C(GEOS C++接口)是GEOS库的C语言版本,它提供了一套丰富的API,允许开发者在C++程序中执行复杂的几何形状处理和空间关系分析。GEOS_C是基于JTS(Java Topolog…

【MySQL】MySQL5.6---windows版本安装(附安装包)

期待您的关注 mysql5.6点击此处下载 提取码&#xff1a;gckb 我将mysql5.6-windows版本的解压包放到了上方的连接当中&#xff0c;如若不想使用我提供的安装包大家还可以在官方网站自行下载。 官方地址&#xff1a;https://dev.mysql.com/downloads/mysql/ 首先检查本地有没…