Flink operator实现自动扩缩容

官网文档位置:

1.Autoscaler | Apache Flink Kubernetes Operator

2.Configuration | Apache Flink Kubernetes Operator

1.部署K8S集群

可参照我之前的文章k8s集群搭建

2.Helm安装Flink-Operator

helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.10.0/

helm repo update

--如果没有这个命名空间就创建
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator \
--namespace=flink-operator \
--create-namespace \
--set webhook.create=false \
--version 1.10.0

3.安装prometheus

operator通过监控prometheus实现自动扩缩容,过两天调整为helm

可以采用helm安装也可采用yaml,由于helm没安装成功我就采用yaml安装了

# prometheus-basic.yaml
apiVersion: v1
kind: Namespace
metadata:
  name: monitoring
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: prometheus-config
  namespace: monitoring
data:
  prometheus.yml: |
    global:
      scrape_interval: 15s
      evaluation_interval: 15s
    scrape_configs:
      - job_name: 'flink'
        static_configs:
          - targets: ['flink-metrics.flink-apps.svc.cluster.local:9249']
        metrics_path: /metrics
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: prometheus
  namespace: monitoring
spec:
  selector:
    matchLabels:
      app: prometheus
  replicas: 1
  template:
    metadata:
      labels:
        app: prometheus
    spec:
      containers:
      - name: prometheus
        image: prom/prometheus:v2.30.3
        args:
        - "--config.file=/etc/prometheus/prometheus.yml"
        - "--storage.tsdb.path=/prometheus"
        - "--web.enable-lifecycle"
        ports:
        - containerPort: 9090
        volumeMounts:
        - name: config-volume
          mountPath: /etc/prometheus/
        - name: storage-volume
          mountPath: /prometheus
      volumes:
      - name: config-volume
        configMap:
          name: prometheus-config
      - name: storage-volume
        emptyDir: {}
---
apiVersion: v1
kind: Service
metadata:
  name: prometheus
  namespace: monitoring
spec:
  type: NodePort
  ports:
  - port: 9090
    targetPort: 9090
    nodePort: 30090
  selector:
    app: prometheus

4.制作镜像包

Dockerfile内容,flink-test-1.0-SNAPSHOT.jar为测试代码

ARG FLINK_VERSION=1.18.1
FROM flink:${FLINK_VERSION}-scala_2.12
RUN mkdir -p /opt/flink/usrlib
COPY flink-test-1.0-SNAPSHOT.jar /opt/flink/usrlib/
COPY flink-metrics-prometheus-1.18.1.jar  /opt/flink/lib/
COPY flink-statebackend-rocksdb-1.18.1.jar  /opt/flink/lib/
COPY flink-connector-files-1.18.1.jar  /opt/flink/lib/
WORKDIR /opt/flink




# 1. 构建 Docker 镜像
# -t: 指定镜像名称和标签
# .: 使用当前目录的 Dockerfile
# --no-cache: 不使用缓存,从头构建
docker build -t zht-flink:1.18.1 . --no-cache

# 2. 为本地镜像添加远程仓库标签
# 格式: registry地址/命名空间/镜像名:标签
docker tag zht-flink:1.18.1 registry.cn-hangzhou.aliyuncs.com/dinkyhub/zht-flink:1.18.1

# 3. 推送镜像到阿里云镜像仓库
# 将标记的镜像推送到远程仓库
docker push registry.cn-hangzhou.aliyuncs.com/dinkyhub/zht-flink:1.18.1

5.创建命名空间和serviceaccount等

kubectl create namespace  flink-apps

kubectl -n flink-apps create serviceaccount flink-serviceaccount

kubectl -n flink-apps create clusterrolebinding flink-role-binding --clusterrole=edit --serviceaccount=flink-apps:flink-serviceaccount


kubectl create secret docker-registry flink-apps-secret \
--docker-server=registry.cn-hangzhou.aliyuncs.com \
--docker-username=xx \
--docker-password=xxxx \
-n flink-apps

kubectl patch serviceaccount flink-serviceaccount -p '{"imagePullSecrets": [{"name": "flink-apps-secret"}]}' -n  flink-apps

6.任务和扩缩容配置

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: flink-autoscaling-sum-job
  namespace: flink-apps
spec:
  image: registry.cn-hangzhou.aliyuncs.com/dinkyhub/zht-flink:1.18.1
  flinkVersion: v1_18
  mode: native

  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    parallelism.default: "2"
    state.backend: rocksdb
    state.checkpoints.dir: file:///flink-data/checkpoints
    state.savepoints.dir: file:///flink-data/savepoints
    metrics.reporters: prometheus
    metrics.reporter.prometheus.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
    metrics.reporter.prometheus.port: "9249"
    execution.checkpointing.interval: "10000"
    execution.checkpointing.mode: "EXACTLY_ONCE"
    execution.checkpointing.timeout: "600000"
    execution.checkpointing.min.pause: "10000"
    execution.checkpointing.max.concurrent.checkpoints: "1"
    # 启用 Source 指标收集
    metrics.source.enable: "true"
    metrics.source.records.in.enable: "true"
    metrics.source.records.out.enable: "true"
    metrics.source.records.lag.enable: "true"
    # 启用所有算子指标
    metrics.operator.enable: "true"
    metrics.operator.records.in.enable: "true"
    metrics.operator.records.out.enable: "true"
    # 启用任务指标
    metrics.task.enable: "true"
    metrics.task.records.in.enable: "true"
    metrics.task.records.out.enable: "true"
    # 设置指标收集间隔
    metrics.fetcher.update-interval: "1000"
    metrics.latency.interval: "1000"
    # 启用 IO 指标
    metrics.io.enable: "true" 
    jobmanager.scheduler: "adaptive"
    # 自动扩缩容配置
    job.autoscaler.enabled: "true"
    job.autoscaler.metrics.window: "20s"
    job.autoscaler.target.utilization: "0.30"
    job.autoscaler.scale.up.threshold: "0.05"
    job.autoscaler.scale.down.threshold: "0.1"
    job.autoscaler.metrics.memory.average: "1.0"
    job.autoscaler.metrics.memory.window: "5s"
    job.autoscaler.stabilization.interval: "5s"
    job.autoscaler.cooldown.period: "5s"
    job.autoscaler.scale.up.max.factor: "1.5"
    job.autoscaler.scale.down.max.factor: "0.5"    
    # 指标相关配置
    job.autoscaler.backpressure.enabled: "true"
    metrics.latency.granularity: "operator"
    web.backpressure.refresh-interval: "1000"
    metrics.backpressure.enabled: "true"
    metrics.backpressure.interval: "1000"
    metrics.backpressure.timeout: "60000"
    # 修改 job status metrics 配置
    metrics.job.status.enable: "STATE"
    # 新增 CPU 指标配置
    metrics.system.cpu: "true"
    metrics.system.cpu.load: "true"
    metrics.system.resource: "true"
    
  serviceAccount: flink-serviceaccount

  jobManager:
    resource:
      memory: "1024m"
      cpu: 1
    replicas: 1

  taskManager:
    resource:
      memory: "1024m"
      cpu: 1

  job:
    jarURI: local:///opt/flink/usrlib/flink-test-1.0-SNAPSHOT.jar
    entryClass: com.zht.sumJob
    args: []
    parallelism: 1
    upgradeMode: stateless

  podTemplate:
    spec:
      volumes:
        - name: checkpoint-data
          hostPath:
            path: /data/flink-checkpoints
            type: DirectoryOrCreate
      containers:
        - name: flink-main-container
          volumeMounts:
            - name: checkpoint-data
              mountPath: /flink-data
    metadata:
      annotations:
        prometheus.io/scrape: "true"
        prometheus.io/port: "9249"

---
apiVersion: batch/v1
kind: Job
metadata:
  name: init-checkpoint-dir
  namespace: flink-apps
spec:
  template:
    spec:
      serviceAccountName: flink-serviceaccount
      containers:
      - name: init-dir
        image: busybox
        command: ["/bin/sh", "-c"]
        args:
          - |
            mkdir -p /data/flink-checkpoints/checkpoints
            mkdir -p /data/flink-checkpoints/savepoints
            chmod -R 777 /data/flink-checkpoints
        volumeMounts:
          - name: checkpoint-data
            mountPath: /data/flink-checkpoints
        resources:
          limits:
            cpu: "0.1"
            memory: "64Mi"
          requests:
            cpu: "0.1"
            memory: "64Mi"
      volumes:
        - name: checkpoint-data
          hostPath:
            path: /data/flink-checkpoints
            type: DirectoryOrCreate
      restartPolicy: Never
  backoffLimit: 4

---
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-ui
  namespace: flink-apps
spec:
  type: NodePort
  ports:
    - name: webui
      port: 8081
      targetPort: 8081
      nodePort: 30081
  selector:
    component: jobmanager
    app: flink-autoscaling-sum-job

---
apiVersion: v1
kind: Service
metadata:
  name: flink-metrics
  namespace: flink-apps
spec:
  type: NodePort
  ports:
    - name: metrics
      port: 9249
      targetPort: 9249
      nodePort: 30249
  selector:
    component: taskmanager
    app: flink-autoscaling-sum-job
注意点:

1.添加 flink-metrics-prometheus-1.18.1.jar 不然启动不了metrics
2.注意先排查metrics是否启用成功。curl http://localhost:9249/metrics查看是否有值
3.之后查看prometheus页面的target是否有flink metrics
4.yaml或者flink任务配置好启用监控的配置

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/946610.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

深入Android架构(从线程到AIDL)_03 IPC的IBinder接口

目录 4、 IPC的IBinder接口 -- 定义与实现 IBinder接口的定義 IBinder接口的實現類 Java层的Binder基类定义​编辑 Binder基类的主要函数 Java层的BinderProxy基类定义 4、 IPC的IBinder接口 -- 定义与实现 IBinder接口的定義 大家都知道,当两个类都在同一个…

vscode代码AI插件Continue 安装与使用

“Continue” 是一款强大的插件,它主要用于在开发过程中提供智能的代码延续功能。例如,当你在编写代码并且需要进行下一步操作或者完成一个代码块时,它能够根据代码的上下文、语法规则以及相关的库和框架知识,为你提供可能的代码续…

我的Java-Web进阶--SpringMVC

1.三层架构与MVC模式 三层架构 MVC模式 2.SpringMVC执行流程 3.SpringMVC的基本使用方法 1. 配置 1.1 Maven依赖 首先&#xff0c;在pom.xml文件中添加Spring MVC的依赖&#xff1a; <dependencies><!-- Spring MVC --><dependency><groupId>org.…

flux中的缓存

1. cache&#xff0c;onBackpressureBuffer。都是缓存。cache可以将hot流的数据缓存起来。onBackpressureBuffer也是缓存&#xff0c;但是当下游消费者的处理速度比上游生产者慢时&#xff0c;上游生产的数据会被暂时存储在缓冲区中&#xff0c;防止丢失。 2. Flux.range 默认…

在基于Centos7的服务器上启用【Gateway】的【Clion Nova】(即 ReSharper C++ 引擎)

1. 检查启动报错日志&#xff0c;目录在 ~/.cache/JetBrains/CLion202x.x.x/log/backend.202x-xx-xx_xxxx.xxxx-err.log 2. 大致可能有两种报错 a. Process terminated. Couldnt find a valid ICU package installed on the system. 这个报错只需要装一下 libicu-devel 包即可…

【spring】参数校验Validation

前言 在实际开发中&#xff0c;我们无法保证客户端传来的请求都是合法的。比如一些要求必传的参数没有传递&#xff0c;传来的参数长度不符合要求等&#xff0c;这种时候如果放任不管&#xff0c;继续执行后续业务逻辑&#xff0c;很有可能就会出现意想不到的bug。 有人可能会…

Android实现队列出入队测试

演示效果: 安卓队列测试 入队操作 空队&#xff0c;满队判断 队列实现代码: package com.example.generalqueue;import android.content.Context; import android.widget.Toast;import java.util.Arrays;public class ArrayQueue {private int capacity;//队列容量private in…

Python Pyglet实战(1)——迷宫游戏

大家好啊&#xff0c;今天就来跟大家分享一下Python Pyglet的实战样例吧。 一.导入所需模块 1.导入__future__模块 首先&#xff0c;我们需要导入__future__模块中的division变量&#xff0c;此变量在__future__.py中的定义如下&#xff1a; division _Feature((2, 2, 0, …

第十一章 图论

题目描述&#xff1a; 阿里这学期修了计算机组织和架构课程。他了解到指令之间可能存在依赖关系&#xff0c;比如WAR&#xff08;读后写&#xff09;、WAW、RAW。 如果两个指令之间的距离小于安全距离&#xff0c;则会导致危险&#xff0c;从而可能导致错误的结果。因此&#…

“Gold-YOLO:基于聚合与分发机制的高效目标检测新范式”

&#x1f3e1;作者主页&#xff1a;点击&#xff01; &#x1f916;编程探索专栏&#xff1a;点击&#xff01; ⏰️创作时间&#xff1a;2024年12月26日8点00分 神秘男子影, 秘而不宣藏。 泣意深不见, 男子自持重, 子夜独自沉。 论文源地址&#xff08;有视频&#xf…

python爬虫--小白篇【selenium自动爬取文件】

一、问题描述 在学习或工作中需要爬取文件资源时&#xff0c;由于文件数量太多&#xff0c;手动单个下载文件效率低&#xff0c;操作麻烦&#xff0c;采用selenium框架自动爬取文件数据是不二选择。如需要爬取下面网站中包含的全部pdf文件&#xff0c;并将其转为Markdown格式。…

去除el-tabs 下面的灰色横线,并修改每一项的左右间距,和字体颜色

HTML <el-tabs v-model"activeName" class"demo-tabs" tab-click"handleClick"><el-tab-pane label"全部" :name"null"></el-tab-pane><el-tab-pane label"问答陪练" name"general-t…

笔上云世界微服务版

目录 一、项目背景 二、项目功能 一功能介绍 三、环境准备 • 需要开发的端口 • Mysql 导入数据库 ​编辑 • Redis ​编辑 • RabbitMQ ​编辑 在创建blog虚拟主机(方法如下) • Nacos • Nginx 四、前端部署 五、后端部署 六、测试计划操作 一功能测试 二…

厦门大学联合网易提出StoryWeaver,可根据统一模型内给定的角色实现高质量的故事可视化

厦门大学联合网易提出StoryWeaver&#xff0c;可以根据统一模型内给定的角色实现高质量的故事可视化。可根据故事文本生成与之匹配的图像&#xff0c;并且确保每个角色在不同的场景中保持一致。本文的方法主要包括以下几个步骤&#xff1a; 角色图构建&#xff1a;设计一个角色…

vscode 多项目冲突:进行 vscode 工作区配置

问题&#xff1a;多个项目&#xff0c;每次打开会因为配置问题/包版本冲突&#xff0c;花费过长时间。 解决&#xff1a;可以通过启用工作区&#xff0c;使得各个项目的开发环境隔离。 vscode官网 对此有两种方法&#xff1a;方法一&#xff1a;启用工作区&#xff08;workspa…

Unity3D仿星露谷物语开发14之Custom Property Attribute

1、目标 创建自定义属性特性&#xff0c;类似于[SerializeField]的属性标签。 当用该自定义属性特性标记变量时&#xff0c;可以在Inspector面板中看到相应的效果。 2、Property类 &#xff08;1&#xff09;PropertyAttribute类 propertyAttribute是Unity中用于派生自定义…

赛博周刊·2024年度工具精选(图片资源类)

1、EmojiSpark emoji表情包查找工具。 2、fluentui-emoji 微软开源的Fluent Emoji表情包。 3、开源Emoji库 一个开源的emoji库&#xff0c;目前拥有4000个emoji表情。 4、中国表情包大合集博物馆 一个专门收集中国表情包的项目&#xff0c;已收录5712张表情包&#xff0c;并…

RK3588,基于 Npu 实现 yolov11 Segment 推理

Ultralytics YOLO11是一款尖端的、最先进的模型,它在之前YOLO版本成功的基础上进行了构建,并引入了新功能和改进,以进一步提升性能和灵活性。YOLO11设计快速、准确且易于使用,使其成为各种物体检测和跟踪、实例分割、图像分类以及姿态估计任务的绝佳选择。 https://github.…

MySQL启动报错:发生系统错误 5。拒绝访问。

参考:https://blog.csdn.net/qq_40762011/article/details/105768798/ 1、错误样式 错误样式&#xff0c;如下图所示&#xff1a; 2、导致原因 未使用管理员角色进行此操作&#xff1b; 3、解决办法 3.1、临时办法 不需要更改任何东西&#xff0c;只需要在打开CMD命令提示符时…

数势科技:解锁数据分析 Agent 的智能密码(14/30)

一、数势科技引领数据分析变革 在当今数字化浪潮中&#xff0c;数据已然成为企业的核心资产&#xff0c;而数据分析则是挖掘这一资产价值的关键钥匙。数势科技&#xff0c;作为数据智能领域的领军者&#xff0c;以其前沿的技术与创新的产品&#xff0c;为企业开启了高效数据分析…