Flink on k8s 客户端提交任务源码分析
当我们需要在代码中提交Flink job到kubernetes上时,需要如何做呢?要引入什么第三方依赖?需要提供什么内容?flink是如何将job提交到k8s上的?经过了什么样的流程,内部有什么细节?
本文将通过源码来带你对上面的问题一探究竟,加深我们对Flink on k8s作业提交流程的理解,以及后续在实际的使用场景中如何自己在代码中编写提交flink job的流程。
流程图
整体的提交流程图以及相关注释如下图所示,可以大致看到提交流程中所需要的一些主要步骤和组件。接下来,我们一步步地来分析其提交流程:
一、构建FlinkConfig
既然k8s暴露了客户端提供给用户能够提交资源到其服务上,那么也就会提供一些配置参数来供用户自定义配置,提交Flink任务也是一样。提交Flink job到k8s上部署时,实际最终肯定是要生成一个对应的yaml文件,而yaml属性中肯定会存在一些可以由用户指定,或者需要配置的部分(比如容器名称、JVM参数、request/limit等)。这些属性值在构建yaml的过程中,都会封装到Flink定义的Configuration配置类中。在下游的多个方法中,都会接收Configuration对象来执行诸如创建FlinkKubeClient客户端,构建Pod/Deployment等操作。为此,我们首先来构造一个Configuration对象,并来看看可以配置哪些内容。
首先来看一下Configuration类的结构:
可以看到其核心主要就是包含了一个HashMap结构的confData属性,用来保存具体的key/value属性值。同时,该类提供了一个set方法用来向confData属性中添加键值对:
public <T> Configuration set(ConfigOption<T> option, T value) {
boolean canBePrefixMap = ConfigurationUtils.canBePrefixMap(option);
this.setValueInternal(option.key(), value, canBePrefixMap);
return this;
}
因此,我们可以使用上面的set方法来添加配置属性,比如:
// set common parameter
flinkConfig
// 设置Flink应用的名称,通常用于在Flink UI或日志中识别运行的作业。
.set(PipelineOptions.NAME, submitRequest.effectiveAppName)
// 设置Flink的部署模式。比如,它可能是local、yarn-per-job、kubernetes-application
.set(DeploymentOptions.TARGET, submitRequest.executionMode.getName)
// 配置savepoint的路径
.set(SavepointConfigOptions.SAVEPOINT_PATH, submitRequest.savePoint)
// 配置应用程序的主入口类
.set(ApplicationConfiguration.APPLICATION_MAIN_CLASS, submitRequest.appMain)
// 配置应用程序的命令行参数
.set(ApplicationConfiguration.APPLICATION_ARGS, extractProgramArgs(submitRequest))
// 配置Flink作业的固定ID
.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, submitRequest.jobId)
// extract from submitRequest
flinkConfig
// 配置Flink cluster id
.set(KubernetesConfigOptions.CLUSTER_ID, submitRequest.k8sSubmitParam.clusterId)
// 配置Flink job 提交到k8s时的命名空间
.set(KubernetesConfigOptions.NAMESPACE, submitRequest.k8sSubmitParam.kubernetesNamespace)
// 配置rest service的暴露类型 LoadBalancer、ClusterIP、NodePort
.set(
KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
covertToServiceExposedType(submitRequest.k8sSubmitParam.flinkRestExposedType))
// 配置Flink conf的路径
if (!flinkConfig.contains(DeploymentOptionsInternal.CONF_DIR)) {
flinkConfig.set(DeploymentOptionsInternal.CONF_DIR, s"${submitRequest.flinkVersion.flinkHome}/conf")
}
// 添加Flink容器镜像标签
flinkConfig.set(KubernetesConfigOptions.CONTAINER_IMAGE, buildResult.flinkImageTag)
// 配置k8s config文件路径
flinkConfig.set(KubernetesConfigOptions.KUBE_CONFIG_FILE, "~/.kube/config
")
...
配置JVM参数:
// 自定义方法向Configuration对象中写入JVM参数
def setJvmOptions(submitRequest: SubmitRequest, flinkConfig: Configuration): Unit = {
if (MapUtils.isNotEmpty(submitRequest.properties)) {
submitRequest.properties.foreach(x => {
val k = x._1.trim
val v = x._2.toString
if (k == CoreOptions.FLINK_JVM_OPTIONS.key()) {
// 配置应用程序运行时的全局JVM参数
flinkConfig.set(CoreOptions.FLINK_JVM_OPTIONS, v)
} else if (k == CoreOptions.FLINK_JM_JVM_OPTIONS.key()) {
// 配置JobManager运行时的JVM参数
flinkConfig.set(CoreOptions.FLINK_JM_JVM_OPTIONS, v)
} else if (k == CoreOptions.FLINK_HS_JVM_OPTIONS.key()) {
// 配置HistoryServer运行时的JVM参数
flinkConfig.set(CoreOptions.FLINK_HS_JVM_OPTIONS, v)
} else if (k == CoreOptions.FLINK_TM_JVM_OPTIONS.key()) {
// 配置TaskManager运行时的JVM参数
flinkConfig.set(CoreOptions.FLINK_TM_JVM_OPTIONS, v)
} else if (k == CoreOptions.FLINK_CLI_JVM_OPTIONS.key()) {
// 配置Flink CLI(Command Line Interface)运行时的JVM参数
flinkConfig.set(CoreOptions.FLINK_CLI_JVM_OPTIONS, v)
}
})
}
}
上面我们展示了向flinkConfig对象中添加flink相关参数、k8s环境相关参数、JVM相关参数的案例。这些参数后续会在下游的其他方法中被读取使用,用来创建相应资源的对象。
有了Configuration配置类后,就可以基于此来执行后续的一系列操作了。
二、创建KubernetesClusterDescriptor
KubernetesClusterDescriptor 是 Flink 中一个用于描述 Kubernetes 集群的类。它实现了 ClusterDescriptor 接口,该接口是表示集群描述的通用接口,它定义了获取、部署以及终止具体集群(如Yarn, Mesos, Standalone 或 Kubernetes)的方法。
KubernetesClusterDescriptor类图结构如下所示:
这个个类在Flink的源代码中负责与Kubernetes集群进行交互,包括在该集群上部署或撤销Flink集群。
其主要功能如下:
- 部署Flink集群:这个类的deploySessionCluster()、deployJobCluster()、deployApplicationCluster()方法可以分别创建一个Session模式集群和一个Per-Job模式集群(k8s不支持 Flink 1.12版本)、Application模式集群。
- 停止Flink集群:通过调用killCluster()方法,可以停止在Kubernetes集群上运行的Flink集群。
- 检索Flink集群信息:retrieve()方法可以根据提供的集群ID检索一个现有的Flink集群。
假设此次我们需要以Application模式部署Flink集群,因此我们就要创建一个KubernetesClusterDescriptor对象,然后调用deployApplicationCluster()方法执行创建。
通过上面的类图可以看到,KubernetesClusterDescriptor的构造方法需要传入Configuration对象和FlinkKubeClient对象。Configuration对象我们刚刚已经创建并初始化了,接下来就需要创建一个FlinkKubeClient对象了。
Flink提供了对应了工厂类FlinkKubeClientFactory来创建FlinkKubeClient对象,因此,我们只需要调用如下方法进行创建:
FlinkKubeClientFactory.getInstance().fromConfiguration(configuration, "client"));
进入到fromConfiguration方法中看一下做了什么事情:
public FlinkKubeClient fromConfiguration(Configuration flinkConfig, String useCase) {
final Config config;
// Kubernetes 配置文件中所需的上下文,用于配置 Kubernetes 客户端以与集群交互。 如果配置了多个上下文并且想要在不同的 Kubernetes 集群/上下文上管理不同的 Flink 集群,这可能会很有帮助
final String kubeContext = flinkConfig.getString(KubernetesConfigOptions.CONTEXT);
if (kubeContext != null) {
LOG.info("Configuring kubernetes client to use context {}.", kubeContext);
}
// 判断是否指定了kubernetes.config.file参数,如果指定了则去相应路径加载k8s config配置文件。
final String kubeConfigFile =
flinkConfig.getString(KubernetesConfigOptions.KUBE_CONFIG_FILE);
if (kubeConfigFile != null) {
LOG.debug("Trying to load kubernetes config from file: {}.", kubeConfigFile);
try {
// If kubeContext is null, the default context in the kubeConfigFile will be used.
// Note: the third parameter kubeconfigPath is optional and is set to null. It is
// only used to rewrite
// relative tls asset paths inside kubeconfig when a file is passed, and in the case
// that the kubeconfig
// references some assets via relative paths.
config =
Config.fromKubeconfig(
kubeContext,
FileUtils.readFileUtf8(new File(kubeConfigFile)),
null);
} catch (IOException e) {
throw new KubernetesClientException("Load kubernetes config failed.", e);
}
} else {
// 如果没有指定,则会去查找默认的k8s config路径,默认在~/.kube/config
LOG.debug("Trying to load default kubernetes config.");
config = Config.autoConfigure(kubeContext);
}
// 设置 Kubernetes 的命名空间,如果配置文件中没有设置 KubernetesConfigOptions.NAMESPACE,则默认为 "default"。
final String namespace = flinkConfig.getString(KubernetesConfigOptions.NAMESPACE);
LOG.debug("Setting namespace of Kubernetes client to {}", namespace);
config.setNamespace(namespace);
// 使用 config 创建了一个 DefaultKubernetesClient 实例。这个客户端可以用来和 Kubernetes API server 交互。
final NamespacedKubernetesClient client = new DefaultKubernetesClient(config);
// Kubernetes 客户端执行阻塞 IO 操作所使用的 IO 执行器池的大小(比如start/stop TaskManager Pod)
final int poolSize =
flinkConfig.get(KubernetesConfigOptions.KUBERNETES_CLIENT_IO_EXECUTOR_POOL_SIZE);
// 使用上述创建的 Kubernetes Client 和线程池创建并返回一个 Fabric8FlinkKubeClient 对象,这就是Flink用来与Kubernetes进行交互的客户端。
return new Fabric8FlinkKubeClient(
flinkConfig, client, createThreadPoolForAsyncIO(poolSize, useCase));
}
上述代码注释详细介绍了每一步的主要功能。这里的Config类是在`io.fabric8.kubernetes.client;
包下,包含了k8s config配置文件中的相关属性,比如apiVersion、caCertData(对应certificate-authority-data)、clientKeyData(对应client-key-data)。k8s会调用KubeConfigUtils.parseConfigFromString将读取出来的string格式配置文件加载成Config对象:
public static Config parseConfigFromString(String contents) throws IOException {
ObjectMapper mapper = Serialization.yamlMapper();
return (Config)mapper.readValue(contents, Config.class);
}
接下来会调用Config类下的loadFromKubeconfig()方法对配置文件进行进一步的解析和处理,代码这里就不列出了,有兴趣的可以单独深挖一下源码。
如果没有显式的指定k8s config配置文件路径,则会默认采用下面这种方式得到,对于Mac或linux系统下,默认的路径就是~/.kube/config。
String fileName = Utils.getSystemPropertyOrEnvVar("kubeconfig", (new File(getHomeDir(), ".kube" + File.separator + "config")).toString());
最终,上面代码利用Config对象创建了一个DefaultKubernetesClient 对象,并封装在了Fabric8FlinkKubeClient 对象的client属性中。
上面出现了两个k8s的Client客户端,这里简单描述一下它们的区别:
DefaultKubernetesClient,FlinkKubeClient 和 Fabric8FlinkKubeClient 都与 Kubernetes 集群的交互有关,但它们的角色和作用有所不同。
- DefaultKubernetesClient:这是 fabric8 提供的 Kubernetes Java 客户端库中的一个类。它是 Kubernetes Java 客户端的主要实现,负责与 Kubernetes API Server 进行基础交互,如创建、更新、查询和删除 Kubernetes 资源。
- FlinkKubeClient:这是 Flink 中定义的一个接口,目的是为了抽象与 Kubernetes 集群交互的细节。通过这个接口,Flink 可以在 Kubernetes 集群上执行特定于 Flink 的操作,例如创建和删除 Flink 集群,查看 Flink pod 的状态等。
- Fabric8FlinkKubeClient:这是一个实现了 FlinkKubeClient 接口的类,这个类将 Flink 的需求和 fabric8 Kubernetes 客户端的功能结合在一起,使得 Flink 可以用专门针对它的方式在 Kubernetes 上执行操作。
总的来说,DefaultKubernetesClient 是用来执行一般性 Kubernetes 操作的客户端,FlinkKubeClient 是Flink为了在 Kubernetes 上运行而定义的一套接口,而 Fabric8FlinkKubeClient 则是将 Flink 的需求通过基于 fabric8 的 Kubernetes 客户端具体实现出来的类。
目前,有了Configuration对象和FlinkKubeClient对象之后,就可以创建KubernetesClusterDescriptor实例了。实际上,我们可以根据KubernetesClusterClientFactory工厂类来创建,实际的执行流程仍然是上面介绍的那些。
@Override
public KubernetesClusterDescriptor createClusterDescriptor(Configuration configuration) {
checkNotNull(configuration);
if (!configuration.contains(KubernetesConfigOptions.CLUSTER_ID)) {
final String clusterId = generateClusterId();
configuration.setString(KubernetesConfigOptions.CLUSTER_ID, clusterId);
}
return new KubernetesClusterDescriptor(
configuration,
FlinkKubeClientFactory.getInstance().fromConfiguration(configuration, "client"));
}
三、执行deployApplicationCluster()方法
在第二步介绍KubernetesClusterDescriptor时提到,该描述器可以用于k8s集群交互,部署Flink集群。因此,假设本次是要以Application模式部署Flink集群,所以使用到的方法是deployApplicationCluster()方法。方法源代码如下所示:
@Override
public ClusterClientProvider<String> deployApplicationCluster(
final ClusterSpecification clusterSpecification,
final ApplicationConfiguration applicationConfiguration)
throws ClusterDeploymentException {
// 检查是否存在clusterId同名的Flink集群,如果有则报错
if (client.getRestService(clusterId).isPresent()) {
throw new ClusterDeploymentException(
"The Flink cluster " + clusterId + " already exists.");
}
checkNotNull(clusterSpecification);
checkNotNull(applicationConfiguration);
// 获取Flink的部署模式并判断是否合法
final KubernetesDeploymentTarget deploymentTarget =
KubernetesDeploymentTarget.fromConfig(flinkConfig);
if (KubernetesDeploymentTarget.APPLICATION != deploymentTarget) {
throw new ClusterDeploymentException(
"Couldn't deploy Kubernetes Application Cluster."
+ " Expected deployment.target="
+ KubernetesDeploymentTarget.APPLICATION.getName()
+ " but actual one was \""
+ deploymentTarget
+ "\"");
}
applicationConfiguration.applyToConfiguration(flinkConfig);
// No need to do pipelineJars validation if it is a PyFlink job.
// 如果这不是 Python 应用(由 PackagedProgramUtils.isPython() 判断),代码将验证 pipelineJars。pipelineJars 是用户指定的作业包(jar包)路径,对于Application模式,只应存在一个 jar 包。
if (!(PackagedProgramUtils.isPython(applicationConfiguration.getApplicationClassName())
|| PackagedProgramUtils.isPython(applicationConfiguration.getProgramArguments()))) {
final List<File> pipelineJars =
KubernetesUtils.checkJarFileForApplicationMode(flinkConfig);
Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar");
}
// 调用 deployClusterInternal 方法部署 Flink 集群。KubernetesApplicationClusterEntrypoint 是作业集群的入口类。
final ClusterClientProvider<String> clusterClientProvider =
deployClusterInternal(
KubernetesApplicationClusterEntrypoint.class.getName(),
clusterSpecification,
false);
// 集群创建完成后,通过 getClusterClient 获取 ClusterClient.
try (ClusterClient<String> clusterClient = clusterClientProvider.getClusterClient()) {
LOG.info(
"Create flink application cluster {} successfully, JobManager Web Interface: {}",
clusterId,
clusterClient.getWebInterfaceURL());
}
return clusterClientProvider;
}
上面方法中的主要核心逻辑是执行deployClusterInternal()方法。其他部分首先判断clusterId是否存在、部署模式是否正确、用户指定的jar包数是否合法(只能有一个)。这些验证通过后,就会调用 deployClusterInternal 方法部署 Flink 集群,其中传入的参数KubernetesApplicationClusterEntrypoint.class.getName(),KubernetesApplicationClusterEntrypoint 是作业集群的入口类。该参数用于后续指定jobmanager的入口类。
四、执行deployClusterInternal()方法
进入deployClusterInternal()方法内部:
private ClusterClientProvider<String> deployClusterInternal(
String entryPoint, ClusterSpecification clusterSpecification, boolean detached)
throws ClusterDeploymentException {
final ClusterEntrypoint.ExecutionMode executionMode =
detached
? ClusterEntrypoint.ExecutionMode.DETACHED
: ClusterEntrypoint.ExecutionMode.NORMAL;
flinkConfig.setString(
ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, executionMode.toString());
flinkConfig.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, entryPoint);
// Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values.
KubernetesUtils.checkAndUpdatePortConfigOption(
flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT);
KubernetesUtils.checkAndUpdatePortConfigOption(
flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT);
KubernetesUtils.checkAndUpdatePortConfigOption(
flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT);
if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) {
flinkConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId);
KubernetesUtils.checkAndUpdatePortConfigOption(
flinkConfig,
HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE,
flinkConfig.get(JobManagerOptions.PORT));
}
try {
final KubernetesJobManagerParameters kubernetesJobManagerParameters =
new KubernetesJobManagerParameters(flinkConfig, clusterSpecification);
final FlinkPod podTemplate =
kubernetesJobManagerParameters
.getPodTemplateFilePath()
.map(
file ->
KubernetesUtils.loadPodFromTemplateFile(
client, file, Constants.MAIN_CONTAINER_NAME))
.orElse(new FlinkPod.Builder().build());
final KubernetesJobManagerSpecification kubernetesJobManagerSpec =
KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
podTemplate, kubernetesJobManagerParameters);
client.createJobManagerComponent(kubernetesJobManagerSpec);
return createClusterClientProvider(clusterId);
} catch (Exception e) {
try {
LOG.warn(
"Failed to create the Kubernetes cluster \"{}\", try to clean up the residual resources.",
clusterId);
client.stopAndCleanupCluster(clusterId);
} catch (Exception e1) {
LOG.info(
"Failed to stop and clean up the Kubernetes cluster \"{}\".",
clusterId,
e1);
}
throw new ClusterDeploymentException(
"Could not create Kubernetes cluster \"" + clusterId + "\".", e);
}
}
这个方法是在 Flink 的 Kubernetes 集群上实际部署 Flink 集群的核心代码。以下是这段代码执行的主要步骤:
- 设置执行模式(Execution Mode):根据输入的 detached 标识确定是否为分离模式,在分离模式下,Flink 集群和调用它的客户端会在启动后立即分离,否则为常规模式。将执行模式设置到 Flink 配置里。
- 设置入口点类: 入口点类指定了集群启动后运行的Java类。这个类一般是 KubernetesApplicationClusterEntrypoint 或 KubernetesSessionClusterEntrypoint。这个值设置到 Flink 配置里。
- 确认并设置相关网络端口:确认并设置了Rpc端口,BLOB服务器端口,REST绑定端口和TaskManager Rpc端口。这是为了确保集群的正常运行。
- 处理高可用性模式: 如果开启了高可用性模式,设置相关的配置并检查 JobManager 端口。
- 创建JobManager参数和Pod模板:创建描述 JobManager 的参数类 KubernetesJobManagerParameters,然后创建Pod模板。如果指定了Pod模板文件,将从文件加载一个Pod模板,否则使用一个默认的空的 Pod 模板。
- 构建并创建JobManager组件:使用 Pod 模板和 JobManager 参数来构建 Kubernetes JobManager 规格描述,并创建 JobManager 组件。
- 创建并返回ClusterClientProvider:创建并返回一个 ClusterClientProvider 实例。ClusterClientProvider 可以用于获取 ClusterClient,并进一步操作所创建的 Flink 集群。
- 错误处理:如果在创建过程中发生任何错误,会试图停止并清理已创建的资源,然后抛出 ClusterDeploymentException 异常。
⠀上面这段代码有两个核心处理步骤:
- KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification(
podTemplate, kubernetesJobManagerParameters); - client.createJobManagerComponent(kubernetesJobManagerSpec);
我们在第一部分的时候提到过,k8s集群的资源部署都是通过yaml配置文件来执行的,因此,flink job提交到k8s上实际上也是通过创建相应的yaml来执行的。在第一部分我们创建了Configuration对象并添加了多种类型参数,这些参数最终会加载到需要创建的资源(比如Deployment)对应的yaml文件中。而buildKubernetesJobManagerSpecification()方法就是实现上述过程的核心方法,描述了资源yaml的创建过程。
五、KubernetesStepDecorator装饰接口
KubernetesStepDecorator 是 Flink Kubernetes 集群部署流程中的一个关键接口。它封装了在创建 Flink Kubernetes 集群时,如何修改 Pod 规格并且新增 Kubernetes 附加资源的步骤。
从总体上看,它有两个主要功能:
- 装饰 Flink Pod: 通过 decorateFlinkPod 方法,接口实现可以在 Flink Pod(一个用于创建 Kubernetes Pod 的模板)的基础上添加或修改一些配置。例如添加新的环境变量、修改容器的启动参数等。
- 创建附加的 Kubernetes 资源:通过 buildAccompanyingKubernetesResources 方法,接口实现可以创建一些附加的 Kubernetes 资源,并与 Flink Pod 一起提交到 Kubernetes 集群。例如,在部署 Flink JobManager 时,可能会创建与 JobManager Pod 相关联的 Service 对象,以便可以从外部访问 JobManager。
这个接口的具体实现类将决定具体要进行哪些修改或添加哪些额外的资源。典型的 KubernetesStepDecorator 实现有 EnvSecretsDecorator(添加环境变量),MountSecretsDecorator(挂载 Secret 对象),CmdJobManagerDecorator(修改 JobManager 启动命令)等。这些实现会分别根据不同的需要来装饰 Flink Pod 和创建附加资源。
Flink定义了多种装饰类来实现不同资源、不同组件的属性加载。接下来,我们就来看一下buildKubernetesJobManagerSpecification()方法中使用了哪些实现类,是如何来构造yaml文件的:
public static KubernetesJobManagerSpecification buildKubernetesJobManagerSpecification(
FlinkPod podTemplate, KubernetesJobManagerParameters kubernetesJobManagerParameters)
throws IOException {
FlinkPod flinkPod = Preconditions.checkNotNull(podTemplate).copy();
List<HasMetadata> accompanyingResources = new ArrayList<>();
final KubernetesStepDecorator[] stepDecorators =
new KubernetesStepDecorator[] {
new InitJobManagerDecorator(kubernetesJobManagerParameters),
new EnvSecretsDecorator(kubernetesJobManagerParameters),
new MountSecretsDecorator(kubernetesJobManagerParameters),
new CmdJobManagerDecorator(kubernetesJobManagerParameters),
new InternalServiceDecorator(kubernetesJobManagerParameters),
new ExternalServiceDecorator(kubernetesJobManagerParameters),
new HadoopConfMountDecorator(kubernetesJobManagerParameters),
new KerberosMountDecorator(kubernetesJobManagerParameters),
new FlinkConfMountDecorator(kubernetesJobManagerParameters),
new PodTemplateMountDecorator(kubernetesJobManagerParameters)
};
for (KubernetesStepDecorator stepDecorator : stepDecorators) {
flinkPod = stepDecorator.decorateFlinkPod(flinkPod);
accompanyingResources.addAll(stepDecorator.buildAccompanyingKubernetesResources());
}
final Deployment deployment =
createJobManagerDeployment(flinkPod, kubernetesJobManagerParameters);
return new KubernetesJobManagerSpecification(deployment, accompanyingResources);
}
可以看到,Flink定义了诸如InitJobManagerDecorator、CmdJobManagerDecorator、InternalServiceDecorator、FlinkConfMountDecorator等多种装饰实现类,来分别装饰pod的不同配置。接下来,挑选几个比较重要的装饰类来看一下它们完成了什么功能。
InitJobManagerDecorator
首先分析它的decorateFlinkPod()方法:
@Override
public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
// 创建一个PodBuilder,是Pod的Builder构造器。这里getPodWithoutMainContainer表示获取的Pod内容中不包括主容器部分(也就是spec.containers),这一部分属性会单独进行配置.
final PodBuilder basicPodBuilder = new PodBuilder(flinkPod.getPodWithoutMainContainer());
// Overwrite fields
// 覆盖一些 Kubernetes Pod 的字段,例如定义了 Pod 的 service account 名称
final String serviceAccountName =
KubernetesUtils.resolveUserDefinedValue(
flinkConfig,
KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT,
kubernetesJobManagerParameters.getServiceAccount(),
KubernetesUtils.getServiceAccount(flinkPod),
"service account");
if (flinkPod.getPodWithoutMainContainer().getSpec().getRestartPolicy() != null) {
logger.info(
"The restart policy of JobManager pod will be overwritten to 'always' "
+ "since it is controlled by the Kubernetes deployment.");
}
basicPodBuilder
.withApiVersion(API_VERSION) // 绑定ApiVersion属性值
.editOrNewSpec() // 编辑或创建一个Spec标签
.withServiceAccount(serviceAccountName) // 绑定spec.serviceAccount值
.withServiceAccountName(serviceAccountName) // 绑定spec.serviceAccountName值
.endSpec(); // 调用build()方法生成PodSpec对象
// Merge fields
// 跟上面的过程同理,创建相应的标签,绑定标签下的属性值,在end方法中调用Builder.build()方法生成相应的标签对象
basicPodBuilder
.editOrNewMetadata()
.addToLabels(kubernetesJobManagerParameters.getLabels())
.addToAnnotations(kubernetesJobManagerParameters.getAnnotations())
.endMetadata()
.editOrNewSpec()
.addToImagePullSecrets(kubernetesJobManagerParameters.getImagePullSecrets())
.addToNodeSelector(kubernetesJobManagerParameters.getNodeSelector())
.addAllToTolerations(
kubernetesJobManagerParameters.getTolerations().stream()
.map(e -> KubernetesToleration.fromMap(e).getInternalResource())
.collect(Collectors.toList()))
.endSpec();
// 单独构造containers标签下的属性值
final Container basicMainContainer = decorateMainContainer(flinkPod.getMainContainer());
// 根据绑定值生成一个FlinkPod对象
return new FlinkPod.Builder(flinkPod)
.withPod(basicPodBuilder.build())
.withMainContainer(basicMainContainer)
.build();
}
方法 decorateFlinkPod主要作用是装饰(也就是定义或修改)部署到 Kubernetes 集群的 Flink JobManager Pod。在上面的方法中,首先创建了一个PodBuilder构造器,PodBuilder 是一个用于构建 Kubernetes Pod 对象的实用程序类。它实现了 Builder 设计模式,该模式提供了一种链式调用方法来构建复杂对象,这种方式对于需要大量配置属性元素的对象来说特别有用。
借助PodBuilder,可以利用withApiVersion、withKind、withMetadata、withSpec、withStatus方法链式调用定义pod yaml中的不同标签属性值。
实际上对于每个标签,如metadata、spec、status,都会有一个Builder构造器与之对应,负责该标签下的属性定义和修改,比如ObjectMetaBuilder、PodSpecBuilder、PodStatusBuilder。每个标签都对应了一个实体类,包含了其在yaml文件中所能定义的所有属性值,比如以ObjectMeta类为例,其字段包含了yaml中metadata标签下的所有属性:
在该方法中,还调用了decorateMainContainer()方法,以同样的方式来配置主容器的标签属性。
private Container decorateMainContainer(Container container) {
final ContainerBuilder mainContainerBuilder = new ContainerBuilder(container);
// Overwrite fields
final String image =
KubernetesUtils.resolveUserDefinedValue(
flinkConfig,
KubernetesConfigOptions.CONTAINER_IMAGE,
kubernetesJobManagerParameters.getImage(),
container.getImage(),
"main container image");
final String imagePullPolicy =
KubernetesUtils.resolveUserDefinedValue(
flinkConfig,
KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY,
kubernetesJobManagerParameters.getImagePullPolicy().name(),
container.getImagePullPolicy(),
"main container image pull policy");
final ResourceRequirements requirementsInPodTemplate =
container.getResources() == null
? new ResourceRequirements()
: container.getResources();
final ResourceRequirements requirements =
KubernetesUtils.getResourceRequirements(
requirementsInPodTemplate,
kubernetesJobManagerParameters.getJobManagerMemoryMB(),
kubernetesJobManagerParameters.getJobManagerCPU(),
Collections.emptyMap(),
Collections.emptyMap());
mainContainerBuilder
.withName(Constants.MAIN_CONTAINER_NAME)
.withImage(image)
.withImagePullPolicy(imagePullPolicy)
.withResources(requirements);
// Merge fields
mainContainerBuilder
.addAllToPorts(getContainerPorts())
.addAllToEnv(getCustomizedEnvs())
.addNewEnv()
.withName(ENV_FLINK_POD_IP_ADDRESS)
.withValueFrom(
new EnvVarSourceBuilder()
.withNewFieldRef(API_VERSION, POD_IP_FIELD_PATH)
.build())
.endEnv();
return mainContainerBuilder.build();
}
对于主容器的构建,也会有一个Builder构造器与之对应。实际上,在Fabric8 Kubernetes Java 客户端库中对于k8s中的每种资源,都提供了一个Builder构造器与之对应,并提供了采用链式调用来创建对应资源的方法。
之所以在装饰的过程中要将pod配置和主容器配置隔离开,原因之一可能是主容器的配置和其他 Pod 的配置(比如 Pod 元数据,节点选择器等)是两种不同层次的配置。主容器的配置主要关注容器级别的东西,比如容器的镜像,容器的环境变量,容器的启动命令等。而 Pod 的配置主要关注 Pod 级别的东西,如副本数量、重启策略等。这样区分能够增加了代码的清晰度和易理解性,同时如果在多个地方需要装饰主容器,那么使用一个封装好的 decorateMainContainer 方法可以提高代码复用性。
回顾一下上面的两个方法,主要实现的功能就是对要在 Kubernetes 集群上部署的 Flink Pod 和其主容器进行定制化修改和配置,Fabric8 Kubernete库提供了Builder构造器的方式方便快速完成上述配置过程。
FlinkConfMountDecorator
接下来,我们再来看一下另一个装饰类,该装饰类负责挂载log4j.properties、logback.xml、flink-conf配置到JobManager/TaskManager pod。挂载的方式是通过将上述配置信息包装到一个ConfigMap中,并定义在Pod的volumes.config下,然后在volumeMounts字段中进行挂载。
首先来看一下decorateFlinkPod()方法:
@Override
public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
final Pod mountedPod = decoratePod(flinkPod.getPodWithoutMainContainer());
final Container mountedMainContainer =
new ContainerBuilder(flinkPod.getMainContainer())
.addNewVolumeMount()
.withName(FLINK_CONF_VOLUME)
.withMountPath(kubernetesComponentConf.getFlinkConfDirInPod())
.endVolumeMount()
.build();
return new FlinkPod.Builder(flinkPod)
.withPod(mountedPod)
.withMainContainer(mountedMainContainer)
.build();
}
private Pod decoratePod(Pod pod) {
final List<KeyToPath> keyToPaths =
getLocalLogConfFiles().stream()
.map(
file ->
new KeyToPathBuilder()
.withKey(file.getName())
.withPath(file.getName())
.build())
.collect(Collectors.toList());
keyToPaths.add(
new KeyToPathBuilder()
.withKey(FLINK_CONF_FILENAME)
.withPath(FLINK_CONF_FILENAME)
.build());
final Volume flinkConfVolume =
new VolumeBuilder()
.withName(FLINK_CONF_VOLUME)
.withNewConfigMap()
.withName(getFlinkConfConfigMapName(kubernetesComponentConf.getClusterId()))
.withItems(keyToPaths)
.endConfigMap()
.build();
return new PodBuilder(pod)
.editSpec()
.addNewVolumeLike(flinkConfVolume)
.endVolume()
.endSpec()
.build();
}
private List<File> getLocalLogConfFiles() {
final String confDir = kubernetesComponentConf.getConfigDirectory();
final File logbackFile = new File(confDir, CONFIG_FILE_LOGBACK_NAME);
final File log4jFile = new File(confDir, CONFIG_FILE_LOG4J_NAME);
List<File> localLogConfFiles = new ArrayList<>();
if (logbackFile.exists()) {
localLogConfFiles.add(logbackFile);
}
if (log4jFile.exists()) {
localLogConfFiles.add(log4jFile);
}
return localLogConfFiles;
}
上面代码中,首先执行了decoratePod()方法。在decoratePod()方法中,getLocalLogConfFiles()方法首先找到Flink conf配置文件所在的路径,并获取其logback.xml文件和log4j.properties文件。然后再加入flink-conf.yaml文件,一起作为属性值放在volumes.config属性下,得到的结果可能如下所示:
volumes:
- configMap:
defaultMode: 420
items:
- key: logback-console.xml
path: logback-console.xml
- key: log4j-console.properties
path: log4j-console.properties
- key: flink-conf.yaml
path: flink-conf.yaml
name: flink-config-{clusterId}
name: flink-config-volume
配置好数据卷(volumes)属性后,decorateFlinkPod()方法需要指定数据卷在容器内的挂载路径,因此该方法实现在主容器中配置数据卷挂载属性(volumeMounts),得到的结果可能如下所示:
volumeMounts:
- mountPath: /opt/flink/conf
name: flink-config-volume
即把数据卷ConfigMap资源,挂载到容器内的/opt/flink/conf路径下。
在decorateFlinkPod()方法中配置好数据卷和数据卷挂载后,有一个问题,就是数据卷是怎么生成的呢?我们在pod中定义了名称为flink-config-{clusterId}的ConfigMap,但是这个ConfigMap是如何生成的呢?是自动生成的吗?
这个问题的答案,就在buildAccompanyingKubernetesResources()方法中:
public List<HasMetadata> buildAccompanyingKubernetesResources() throws IOException {
final String clusterId = kubernetesComponentConf.getClusterId();
final Map<String, String> data = new HashMap<>();
// 获取Flink conf的文件路径
final List<File> localLogFiles = getLocalLogConfFiles();
for (File file : localLogFiles) {
// 读取文件内容,与文件名一起放入data中
data.put(file.getName(), Files.toString(file, StandardCharsets.UTF_8));
}
// 将flinkConfig转成map,并删除一些不应该放到集群端的配置选项。
final Map<String, String> propertiesMap =
getClusterSidePropertiesMap(kubernetesComponentConf.getFlinkConfiguration());
// 转换成key: value的形式
data.put(FLINK_CONF_FILENAME, getFlinkConfData(propertiesMap));
// 利用ConfigMapBuilder构造器生成ConfigMap
final ConfigMap flinkConfConfigMap =
new ConfigMapBuilder()
.withApiVersion(Constants.API_VERSION)
.withNewMetadata()
.withName(getFlinkConfConfigMapName(clusterId))
.withLabels(kubernetesComponentConf.getCommonLabels())
.endMetadata()
.addToData(data)
.build();
return Collections.singletonList(flinkConfConfigMap);
}
buildAccompanyingKubernetesResources()方法用来生成部署 Flink 作业需要的附加 Kubernetes 资源的,这里就是为了生成对应的ConfigMap资源。在该方法中,同样调用getLocalLogConfFiles()方法找到logback和log4j文件并加载成字符串,然后对flinkConfig配置类进行处理,最终生成key: value的yaml格式,然后一起放在ConfigMap的data属性下。
至此,我们就知道了pod中挂载的ConfigMap是如何生成的了。Flink在以Application模式部署比如Deployment时,会伴随的生成相应ConfigMap。Pod中通过挂载该ConfigMap,将flink-conf.yaml、log4j-console.properties、logback-console.xml文件挂载到pod的指定文件路径下,从而指定当前部署Flink集群的相关配置信息。
其余的装饰器就不再一一赘述了,有兴趣的可以深入源码逐个分析。经过所以指定的装饰类对Pod属性配置后,接下来,会以该Pod创建JobManager的Deployment。
createJobManagerDeployment
private static Deployment createJobManagerDeployment(
FlinkPod flinkPod, KubernetesJobManagerParameters kubernetesJobManagerParameters) {
final Container resolvedMainContainer = flinkPod.getMainContainer();
final Pod resolvedPod =
new PodBuilder(flinkPod.getPodWithoutMainContainer())
.editOrNewSpec()
.addToContainers(resolvedMainContainer)
.endSpec()
.build();
final Map<String, String> labels = resolvedPod.getMetadata().getLabels();
return new DeploymentBuilder()
.withApiVersion(Constants.APPS_API_VERSION)
.editOrNewMetadata()
.withName(
KubernetesUtils.getDeploymentName(
kubernetesJobManagerParameters.getClusterId()))
.withAnnotations(kubernetesJobManagerParameters.getAnnotations())
.withLabels(kubernetesJobManagerParameters.getLabels())
.withOwnerReferences(
kubernetesJobManagerParameters.getOwnerReference().stream()
.map(e -> KubernetesOwnerReference.fromMap(e).getInternalResource())
.collect(Collectors.toList()))
.endMetadata()
.editOrNewSpec()
.withReplicas(kubernetesJobManagerParameters.getReplicas())
.editOrNewTemplate()
.withMetadata(resolvedPod.getMetadata())
.withSpec(resolvedPod.getSpec())
.endTemplate()
.editOrNewSelector()
.addToMatchLabels(labels)
.endSelector()
.endSpec()
.build();
}
方法创建了一个 Kubernetes Deployment 对象,这个 Deployment 负责部署和管理 Flink 作业管理器(JobManager)的 Pod。
以下是 createJobManagerDeployment 方法的具体执行流程:
- 获取主容器:从输入的 FlinkPod 对象中获取主容器(resolvedMainContainer)。
- 构建 Pod:这一步主要是将获得的主容器添加到输入的 FlinkPod 中(去掉主容器部分的定义),从而生成一个新的、完整的 Pod(resolvedPod)。
- 提取 Pod 标签:从新构建的 Pod 中获取标签。
- 创建 Deployment:使用 DeploymentBuilder 构建一个新的 Kubernetes Deployment 对象。在这个构建过程中,包括了 Deployemt 的元数据(如名字、注解、标签和OwnerReference等)、具体设置(比如复制品数量)、模板(包括了已经创建好的 Pod 元数据和规格),以及选择器(依据提取的 Pod 标签生成)。
至此,我们已经生成好了最终需要部署的Deployment,接下来的问题就是如何提交该Deployment到kubernetes环境中进行部署了。
六、提交部署Deployment
在第二步创建KubernetesClusterDescriptor对象时,我们利用FlinkKubeClientFactory.getInstance().fromConfiguration(configuration, "client”));创建了一个Fabric8FlinkKubeClient的客户端实例,在该客户端实例中,提供了创建JobManager、TaskManager等资源的方法,类图如下所示:
接下来,我们先着重分析其中的createJobManagerComponent()方法。
public void createJobManagerComponent(KubernetesJobManagerSpecification kubernetesJMSpec) {
final Deployment deployment = kubernetesJMSpec.getDeployment();
final List<HasMetadata> accompanyingResources = kubernetesJMSpec.getAccompanyingResources();
// create Deployment
LOG.debug(
"Start to create deployment with spec {}{}",
System.lineSeparator(),
KubernetesUtils.tryToGetPrettyPrintYaml(deployment));
final Deployment createdDeployment =
this.internalClient.apps().deployments().create(deployment);
// Note that we should use the uid of the created Deployment for the OwnerReference.
setOwnerReference(createdDeployment, accompanyingResources);
this.internalClient.resourceList(accompanyingResources).createOrReplace();
}
通过上面的代码可以看到,创建Deployment是通过this.internalClient这个客户端实例来提交的,而其实这个客户端实例,就是我们之前在第二步中创建的DefaultKubernetesClient。我们在之前介绍过,这是 fabric8 提供的 Kubernetes Java 客户端库中的一个类, Kubernetes Java 客户端的主要实现,负责与 Kubernetes API Server 进行基础交互。因此,实际要将资源提交到kubernetes环境中时,需要使用该类来提交。
那么接下来,就是要探究该客户端在提交资源的过程中执行了什么流程,是用什么方式提交的?是用HTTP请求提交的吗? 接下来,让我们深入源码一探究竟。
this.internalClient.apps().deployments().create(deployment)执行到create()方法时经过了以下流程:
- apps():这个方法返回一个句柄,用来操作 Kubernetes 的 Apps API。Apps API 是 Kubernetes API 中的一个部分,主要用于管理 Kubernetes 中的各种应用资源,比如 Deployment,StatefulSet,DaemonSet 等。
- deployments():这个方法返回一个句柄,用来操作 Kubernetes 中的 Deployment 资源。Deployment 是 Kubernetes 提供的一种应用部署方式,它可以确保某个应用一直有指定数量的副本(Pod)在运行。
- create(deployment):这个方法是真正执行操作的地方。它创建一个新的 Deployment,这个 Deployment 的配置来自于传入的 deployment 参数。当这个方法被调用时,Kubernetes Java 客户端会将这个 Deployment 对象转化为一个 Kubernetes API 请求,然后发送到 Kubernetes 集群,请求集群创建这个 Deployment。
我们直接进入create方法中来探究一下真正执行操作时的流程。由于create()方法是InOutCreateable<I, O>接口方法,需要找到调用时的实现类。为此,我们先来看一下调用deployments()方法生成的DeploymentOperationImpl类,下面是其类图:
可以看到,这个类的继承关系还是比较复杂的。可以看到该类继承自BaseOperation父类,而BaseOperation也正是上面接口的实现类。因此,真正执行create()方法是在BaseOperation类的create()方法。来看一下这段代码:
public T create(T resource) {
try {
if (resource != null) {
return this.handleCreate(resource);
} else {
throw new IllegalArgumentException("Nothing to create.");
}
} catch (InterruptedException var3) {
Thread.currentThread().interrupt();
throw KubernetesClientException.launderThrowable(this.forOperationType("create"), var3);
} catch (IOException | ExecutionException var4) {
throw KubernetesClientException.launderThrowable(this.forOperationType("create"), var4);
}
}
当资源不为空时,实际上会执行this.handleCreate(resource)方法:
protected T handleCreate(T resource) throws ExecutionException, InterruptedException, IOException {
// 如果resource的apiVersion值为空,则用当前对象的apiVersion值更新
this.updateApiVersion(resource);
return (HasMetadata)this.handleCreate(resource, this.getType());
}
继续进入到this.handleCreate方法中:
protected <T, I> T handleCreate(I resource, Class<T> outputType) throws ExecutionException, InterruptedException, IOException {
RequestBody body = RequestBody.create(JSON, JSON_MAPPER.writeValueAsString(resource));
Builder requestBuilder = (new Builder()).post(body).url(this.getResourceURLForWriteOperation(this.getResourceUrl(this.checkNamespace(resource), (String)null)));
return this.handleResponse(requestBuilder, outputType, Collections.emptyMap());
}
这里我们就能够看到实际的提交是通过HTTP的Post请求,首先将资源转换成Json String格式,然后放在请求体中利用Post请求发送到Kubernetes Server API中,请求执行资源的创建。
这里主要讲一下如何获取请求的url的过程:
- checkNamespace()方法:获取提交到k8s的namespace命名空间。在之前创建DefaultKubernetesClient客户端时,会根据传入的flinkConfig配置文件初始化一些客户端相关的参数,这里就包括了namespace、apiVersion、masterUrl。
因此,这里实际上就是根据传入的资源和客户端本身来判断资源提交时的命名空间。如果资源的metadata字段不为空,则取metadata下的namespace字段,如果该字段值也不为空,则提交资源的命名空间为该字段值指定的。否则,则是客户端本身初始化时,从flinkConfig中获取的命名空间值。如果两者都为空,则会报错。 - getResourceUrl()方法:获取当前命名空间下的kubernetes server 请求路径。
- getRootUrl():获取底层Url,是由this.config.masterUrl/apis/apiGroupName/apiGroupVersion拼接而成。这里masterUrl实际上是k8s config配置文件中的cluster.server指定的kubernetes服务路径。
- getNamespacedUrl():在上面rootUrl上拼接上命名空间/namespace/${namespace}。然后再拼接上部署的资源类型,比如deployments、pod等。
- 最终getResourceUrl会生成一个诸如https://masterUrl:port/apis/apiGroupName/apiGroupVersion/namespace/${namespace}/deployments的请求路径。
最终,DefaultKubernetesClient客户端会利用HTTP请求到上面构造的请求路径实现创建资源的功能。在执行创建Deployment之后,还有一些伴随资源需要创建,比如上面说的ConfigMap,以及InternalService和ExternalService。
创建这些资源的过程与上面的流程类似,最终仍然也是客户端利用HTTP请求向k8s集群发送创建请求。这里简单介绍一下为什么要创建伴随的HeadlessService和RestService。
- Headless Service(Internal Service):Headless Service 在 Kubernetes 中是一个特殊的服务,它没有负载均衡,并且不会分配 ClusterIP。它通常被用于服务发现,并且可以直接返回后端 Pod 的 IP 地址。在 Flink 部署到 Kubernetes 中,Headless Service 通常用于 JobManager 和 TaskManager 间的通信。JobManager 需要知道所有可用的 TaskManager,而 Headless Service 则提供了一个 DNS 来解析到所有 TaskManager 的地址。
- Rest Service(External Service):通过这种服务对外暴露 Flink JobManager 的 RESTful 接口。由于 Regular Service 提供了负载均衡和一个 ClusterIP,因此它可以被用来从 Kubernetes 集群外部访问 Flink JobManager,例如提交 Flink Job,检查 Job 状态等。另外,这个服务如果类型被设置为 LoadBalancer 或者 NodePort,那么它甚至可以从整个网络访问。
总结
至此,本文介绍了Flink on K8s模式部署时,是如何将任务提交到kubernetes集群上的,这一流程经历了哪些步骤,分别实现了怎样的功能,如何构建出需要部署的资源,以及最终资源是通过什么方式提交到kubernetes集群的。
了解这一过程有助于帮助我们理解Flink On K8s的提交流程,以及当需要自己在代码中提交Flink任务时需要哪些步骤,甚至可以修改源码来实现自定义的资源配置过程。