Flink 源码 TaskManagerRunner 启动 Akka Actor System 源码

TaskManagerRunner 启动 Akka Actor System 源码关键流程

private void startTaskManagerRunnerServices() throws Exception {
	synchronized (lock) {
    rpcSystem = RpcSystem.load(configuration);

    this.executor =
            Executors.newScheduledThreadPool(
                    Hardware.getNumberCPUCores(),
                    new ExecutorThreadFactory("taskmanager-future"));

    highAvailabilityServices =
            HighAvailabilityServicesUtils.createHighAvailabilityServices(
                    configuration,
                    executor,
                    AddressResolution.NO_ADDRESS_RESOLUTION,
                    rpcSystem,
                    this);

    JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));

    rpcService = createRpcService(configuration, highAvailabilityServices, rpcSystem);

		// ...
}

// TaskManagerRunner
static RpcService createRpcService(
        final Configuration configuration,
        final HighAvailabilityServices haServices,
        final RpcSystem rpcSystem)
        throws Exception {

    checkNotNull(configuration);
    checkNotNull(haServices);
return RpcUtils.createRemoteRpcService(
            rpcSystem,
            configuration,
						determineTaskManagerBindAddress(configuration, haServices, rpcSystem),
						//...
						);

}

// RpcUtils
public static RpcService createRemoteRpcService(
        RpcSystem rpcSystem,
        Configuration configuration,
        @Nullable String externalAddress,
        String externalPortRange,
        @Nullable String bindAddress,
        @SuppressWarnings("OptionalUsedAsFieldOrParameterType") Optional<Integer> bindPort)
        throws Exception {
    RpcSystem.RpcServiceBuilder rpcServiceBuilder =
            rpcSystem.remoteServiceBuilder(configuration, externalAddress, externalPortRange);
    if (bindAddress != null) {
        rpcServiceBuilder = rpcServiceBuilder.withBindAddress(bindAddress);
    }
    if (bindPort.isPresent()) {
        rpcServiceBuilder = rpcServiceBuilder.withBindPort(bindPort.get());
    }
    return rpcServiceBuilder.createAndStart();
}




// AkkaRpcServiceUtils
public AkkaRpcService createAndStart(
            TriFunction<ActorSystem, AkkaRpcServiceConfiguration, ClassLoader, AkkaRpcService>
                    constructor)
            throws Exception {
        if (actorSystemExecutorConfiguration == null) {
            actorSystemExecutorConfiguration =
                    AkkaUtils.getForkJoinExecutorConfig(
                            AkkaBootstrapTools.getForkJoinExecutorConfiguration(configuration));
        }

        final ActorSystem actorSystem;

        // akka internally caches the context class loader
        // make sure it uses the plugin class loader
        try (TemporaryClassLoaderContext ignored =
                TemporaryClassLoaderContext.of(getClass().getClassLoader())) {
            if (externalAddress == null) {
                // create local actor system
                actorSystem =
                        AkkaBootstrapTools.startLocalActorSystem(
                                configuration,
                                actorSystemName,
                                logger,
                                actorSystemExecutorConfiguration,
                                customConfig);
            } else {
                // create remote actor system
                actorSystem =
                        AkkaBootstrapTools.startRemoteActorSystem(
                                configuration,
                                actorSystemName,
                                externalAddress,
                                externalPortRange,
                                bindAddress,
                                Optional.ofNullable(bindPort),
                                logger,
                                actorSystemExecutorConfiguration,
                                customConfig);
            }
        }

        return constructor.apply(
                actorSystem,
                AkkaRpcServiceConfiguration.fromConfiguration(configuration),
                RpcService.class.getClassLoader());
    }
}


// AkkaBootstrapTools
private static ActorSystem startRemoteActorSystem(
        Configuration configuration,
        String actorSystemName,
        String externalAddress,
        int externalPort,
        String bindAddress,
        int bindPort,
        Logger logger,
        Config actorSystemExecutorConfiguration,
        Config customConfig)
        throws Exception {

    String externalHostPortUrl =
            NetUtils.unresolvedHostAndPortToNormalizedString(externalAddress, externalPort);
    String bindHostPortUrl =
            NetUtils.unresolvedHostAndPortToNormalizedString(bindAddress, bindPort);
    logger.info(
            "Trying to start actor system, external address {}, bind address {}.",
            externalHostPortUrl,
            bindHostPortUrl);

    try {
        Config akkaConfig =
                AkkaUtils.getAkkaConfig(
                        configuration,
                        new HostAndPort(externalAddress, externalPort),
                        new HostAndPort(bindAddress, bindPort),
                        actorSystemExecutorConfiguration);

        if (customConfig != null) {
            akkaConfig = customConfig.withFallback(akkaConfig);
        }

        return startActorSystem(akkaConfig, actorSystemName, logger);
    } catch (Throwable t) {
        if (t instanceof ChannelException) {
            Throwable cause = t.getCause();
            if (cause != null && t.getCause() instanceof BindException) {
                throw new IOException(
                        "Unable to create ActorSystem at address "
                                + bindHostPortUrl
                                + " : "
                                + cause.getMessage(),
                        t);
            }
        }
        throw new Exception("Could not create actor system", t);
    }
}




private static ActorSystem startActorSystem(
        Config akkaConfig, String actorSystemName, Logger logger) {
    logger.debug("Using akka configuration\n {}", akkaConfig);
    ActorSystem actorSystem = AkkaUtils.createActorSystem(actorSystemName, akkaConfig);

    logger.info("Actor system started at {}", AkkaUtils.getAddress(actorSystem));
    return actorSystem;
}



// AkkaUtils
public static ActorSystem createActorSystem(String actorSystemName, Config akkaConfig) {
    // Initialize slf4j as logger of Akka's Netty instead of java.util.logging (FLINK-1650)
    InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
    return RobustActorSystem.create(actorSystemName, akkaConfig);
}


public static RobustActorSystem create(String name, Config applicationConfig) {
    return create(name, applicationConfig, FatalExitExceptionHandler.INSTANCE);
}


@VisibleForTesting
static RobustActorSystem create(
        String name,
        Config applicationConfig,
        Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
    return create(
            name,
            ActorSystemSetup.create(
                    BootstrapSetup.create(
                            Optional.empty(),
                            Optional.of(applicationConfig),
                            Optional.empty())),
            Option.apply(uncaughtExceptionHandler));
}


private static RobustActorSystem create(
        String name,
        ActorSystemSetup setup,
        Option<Thread.UncaughtExceptionHandler> uncaughtExceptionHandler) {
    final Optional<BootstrapSetup> bootstrapSettings = setup.get(BootstrapSetup.class);
    final ClassLoader classLoader = RobustActorSystem.class.getClassLoader();
    final Config appConfig =
            bootstrapSettings
                    .map(BootstrapSetup::config)
                    .flatMap(RobustActorSystem::toJavaOptional)
                    .orElseGet(() -> ConfigFactory.load(classLoader));
    final Option<ExecutionContext> defaultEC =
            toScalaOption(
                    bootstrapSettings
                            .map(BootstrapSetup::defaultExecutionContext)
                            .flatMap(RobustActorSystem::toJavaOptional));

    final RobustActorSystem robustActorSystem =
            new RobustActorSystem(name, appConfig, classLoader, defaultEC, setup) {
                @Override
                public Thread.UncaughtExceptionHandler uncaughtExceptionHandler() {
                    return uncaughtExceptionHandler.getOrElse(super::uncaughtExceptionHandler);
                }
            };
    robustActorSystem.start();
    return robustActorSystem;
}



private static RobustActorSystem create(
        String name,
        ActorSystemSetup setup,
        Option<Thread.UncaughtExceptionHandler> uncaughtExceptionHandler) {
    final Optional<BootstrapSetup> bootstrapSettings = setup.get(BootstrapSetup.class);
    final ClassLoader classLoader = RobustActorSystem.class.getClassLoader();
    final Config appConfig =
            bootstrapSettings
                    .map(BootstrapSetup::config)
                    .flatMap(RobustActorSystem::toJavaOptional)
                    .orElseGet(() -> ConfigFactory.load(classLoader));
    final Option<ExecutionContext> defaultEC =
            toScalaOption(
                    bootstrapSettings
                            .map(BootstrapSetup::defaultExecutionContext)
                            .flatMap(RobustActorSystem::toJavaOptional));

    final RobustActorSystem robustActorSystem =
            new RobustActorSystem(name, appConfig, classLoader, defaultEC, setup) {
                @Override
                public Thread.UncaughtExceptionHandler uncaughtExceptionHandler() {
                    return uncaughtExceptionHandler.getOrElse(super::uncaughtExceptionHandler);
                }
            };
    robustActorSystem.start();
    return robustActorSystem;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/900015.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

特斯拉Optimus:展望智能生活新篇章

近日&#xff0c;特斯拉举办了 "WE ROBOT" 发布会&#xff0c;发布会上描绘的未来社会愿景&#xff0c;让无数人为之向往。在这场吸引全球无数媒体的直播中&#xff0c;特斯拉 Optimus 人形机器人一出场就吸引了所有观众的关注。从多家媒体现场拍摄的视频可以看出来&…

Ubuntu 上安装 Redmine 5.1 指南

文章目录 官网安装文档&#xff1a;命令步骤相关介绍GemRubyRailsBundler 安装 Redmine更新系统包列表和软件包&#xff1a;安装必要的依赖&#xff1a;安装 Ruby&#xff1a;安装 bundler下载 Redmine 源代码&#xff1a;安装 MySQL配置 Redmine 的数据库配置文件&#xff1a;…

Java 基于 poi 和 itextpdf 实现 excel 转 pdf

目录 问题 实现思路 pom Excel2PDFUtil Excel2PDFUtilTest 输出结果 问题 工作中遇到一个需求&#xff0c;需要实现 excel 文件的预览&#xff0c;实现的思路就是将 excel 转成 pdf 文件&#xff0c;上传到文件服务器上得到文件地址&#xff0c;预览时只需要返回 pdf 预…

Uni-App-02

条件编译 条件编译概念 不同的运行平台终归有些专有的特性&#xff0c;无法实现跨平台完全兼容&#xff0c;例如&#xff1a;微信小程序导航栏右上角的关闭图标。 uni-app提供了一种“条件编译”机制&#xff0c;可以针对特定的平台编译执行特定的代码&#xff0c;否则不执行。…

[JAVAEE] 线程安全的案例(一)-单例模式

目录 一. 单例模式 二. 单例模式的使用时机 三. 单例模式的关键代码 四. 单例模式的几种实现方式 4.1 饿汉方式(急) 4.2 懒汉模式(缓) a. 解决原子性的问题 b. 解决程序运行效率低下的问题 c. 解决指令重排序的问题(其次是为了解决内存可见性的问题) 五. 总结 一. …

【大模型实战篇】大模型分词算法BPE(Byte-Pair Encoding tokenization)及代码示例

词元化是针对自然语言处理任务的数据预处理中一个重要步骤&#xff0c;目的是将原始文本切分成模型可以识别和处理的词元序列。在大模型训练任务中&#xff0c;就是作为大模型的输入。传统的自然语言处理方法&#xff0c;如基于条件随机场的序列标注&#xff0c;主要采用基于词…

Nest.js 实战 (十五):前后端分离项目部署的最佳实践

☘️ 前言 本项目是一个采用现代前端框架 Vue3 与后端 Node.js 框架 Nest.js 实现的前后端分离架构的应用。Vue3 提供了高性能的前端组件化解决方案&#xff0c;而 Nest.js 则利用 TypeScript 带来的类型安全和模块化优势构建了一个健壮的服务端应用。通过这种技术栈组合&…

phpstorm中使用FTP功能和自动上传配置介绍

phpstorm中使用FTP功能和自动上传配置介绍 一、引言 PHPStorm 是一款强大的 PHP IDE&#xff0c;它提供了许多便捷的功能来提高开发效率。其中&#xff0c;内置的 FTP 功能允许开发者直接在 IDE 中上传文件到服务器&#xff0c;而自动上传配置则可以进一步简化这一过程。本文…

ISUP协议视频平台EasyCVR私有化视频平台视频汇聚/存储系统怎么选?

一、EasyCVR视频监控存储系统的核心优势 TSINGSEE青犀EasyCVR视频汇聚平台是一个具备高度集成化、智能化的视频监控汇聚管理平台&#xff0c;拥有远程视频监控、录像、存储、回放、语音对讲、云台控制、告警等多项核心功能。该系统采用先进的网络传输技术&#xff0c;支持高清…

Servlet(三)-------Cookie和session

一.Cookie和Session Cookie和Session都是用于在Web应用中跟踪用户状态的技术。Cookie是存储在用户浏览器中的小文本文件&#xff0c;由服务器发送给浏览器。当用户再次访问同一网站时&#xff0c;浏览器会把Cookie信息发送回服务器。例如&#xff0c;网站可以利用Cookie记住用…

轻松掌握Win10录屏技巧:四大神器推荐!

在Win10系统中&#xff0c;录屏功能的应用越来越广泛&#xff0c;无论是用于工作演示、在线教学还是游戏分享&#xff0c;一款好用的录屏软件都是必不可少的。今天&#xff0c;我们将推荐四款录屏工具&#xff01; 福昕录屏大师 直达链接&#xff1a;www.foxitsoftware.cn/RE…

字符串大小的比较

1.字符串中每一个字符都对应一个码值&#xff0c;字符串比较大小时从第一个字符开始比较出现结果时输出 如下图所示&#xff1a;

力扣之613.直线上的最近距离

文章目录 1. 613.直线上的最近距离1.1 题意1.2 准备数据1.3 题解1.4 结果截图 1. 613.直线上的最近距离 1.1 题意 表&#xff1a; Point ----------------- | Column Name | Type | ----------------- | x | int | ----------------- 在SQL中&#xff0c;x是该表的主键列。 …

《计算机视觉》—— 换脸

效果如下&#xff1a; 完整代码&#xff1a; import cv2 import dlib import numpy as npJAW_POINTS list(range(0, 17)) RIGHT_BROW_POINTS list(range(17, 22)) LEFT_BROW_POINTS list(range(22, 27)) NOSE_POINTS list(range(27, 35)) RIGHT_EYE_POINTS list(range(36…

PON架构(全光网络)

目前组网架构 世界上有一种最快的速度又是光&#xff0c;以前传统以太网络规划满足不了现在的需求。 有线网 无线网 全光网络方案 场景 全光网络分类 以太全光网络 PON&#xff08;Pas-sive-Optical Network 无源光网络&#xff09; 再典型的中大型高校网络中 推荐万兆入…

电脑技巧:Rufus——最佳USB启动盘制作工具指南

目录 一、功能强大&#xff0c;兼容性广泛 二、界面友好&#xff0c;操作简便 三、快速高效&#xff0c;高度可定制 四、安全可靠&#xff0c;社区活跃 在日常的电脑使用中&#xff0c;无论是为了安装操作系统、修复系统故障还是进行其他需要可引导媒体的任务&#xff0c;拥…

目前最新 Reflector V11.1.0.2067版本 .NET 反编译软件

目前最新 Reflector V11.1.0.2067版本 .NET 反编译软件 一、简介二、.NET Reflector的主要功能包括&#xff1a;1. **反编译**: 反编译是将已编译的.NET程序集&#xff08;如.dll或.exe文件&#xff09;转换回可读的源代码。这使得开发者可以查看和学习第三方库的实现细节&…

【分立元件】电阻的额定电压和最高电压

在文章:【分立元件】贴片电阻的额定功率中我们讲到使用电阻器时,不仅要注意额定功率,还要注意电压相关的一些项目。 本文我们将对与电阻基本参数关联的额定电压和元件最高电压这两个术语及其定义(包括它们之间的关系)进行解说。 额定电压 如下所示国巨片式电阻规…

排序算法 —— 希尔排序

目录 1.希尔排序的由来 2.希尔排序的思想 3.希尔排序的实现 实现分析 实现代码 代码优化 4.希尔排序的总结 1.希尔排序的由来 希尔排序是对直接插入排序的优化。在直接插入排序算法中&#xff0c;如果数据是有序or接近有序的时候&#xff0c;直接插入排序算法的时间复杂…

跨时钟域处理(单bit)_2024年10月21日

慢时钟域同步到快时钟域&#xff1a;打两拍 在快时钟域clk下对慢时钟域信号进行打两拍&#xff08;亚稳态概率很低&#xff09; 脉冲宽度改变&#xff0c;但不影响同步结果 快时钟域同步到慢时钟域&#xff08;两种方法&#xff09; ① 脉冲展宽同步 在快时钟域clk下对快时…