一、Work Pool
1、什么是 Work Pool
白话解释:集中管理部署脚本的开关
Work pools allow you to switch between different types of infrastructure and to create a template for deployments. Data platform teams find work pools especially useful for managing infrastructure configuration across teams of data professionals.
Common work pool types include Docker, Kubernetes, and serverless options such as AWS ECS, Azure ACI, and GCP Cloud Run.
2、Work Pool 与 Prefect Server 的关系
3、创建Work Pool
注意:
这里必须要启动一个进程来支持这个Work Pool的启动
4、编写 Dockerfile
制作一个基础镜像:
prefect:customer-define-new
# 使用 Python 3.9 slim 版本作为基础镜像
FROM registry.cn-beijing.aliyuncs.com/dkzx_test/python:3.9-slim
# 设置工作目录
WORKDIR /app
# 更新系统并安装 git 和 Prefect
RUN apt update && \
apt install -y git && \
pip install --upgrade pip && \
pip install -U prefect
# 显示 Prefect 和 git 的版本
CMD ["sh", "-c", "prefect --version && git --version"]
WorkPool 镜像,通过上面的镜像
prefect:customer-define-new
作为基础镜像
如果你的prefect server 使用了nginx的basic auth做的代理
则需要设置 ENV PREFECT_API_URL="http://admin:123@192.168.0.1/api"
# 使用官方 Python 镜像作为基础镜像
FROM registry.cn-beijing.aliyuncs.com/dkzx_test/prefect:customer-define-new
# 设置 Prefect API 的 URL
ENV PREFECT_API_URL="http://192.168.0.1/api"
# 运行时启动 Prefect Worker
CMD ["prefect", "worker", "start", "--pool", "weather-data"]
5、Docker镜像启动Work Pool
启动WorkPool通过docker
docker run -d --name weather-data-wook-pool --restart always <docker_image_id>
启动Docker后查看日志,可以看到 随机id和prefect dashboard中展示一致
表示我们的docker已经启动成功
二、Deployments
部署Deployment可以有2种方式:yaml文件、python脚本
1、Yaml file
If you’d rather take a declarative approach to defining a deployment through a YAML file, use a prefect.yaml file.
Prefect provides an interactive CLI that walks you through creating a prefect.yaml file.
prefect init
2、Python script
2.1、编写一个 flow
2.2、添加 deployment 配置
from prefect import flow
# Source for the code to deploy (here, a GitHub repo)
# git@gitee.com:ajiot_vpp/vpp-py-weather-data.git
SOURCE_REPO = "https://<账号>:<token>@gitee.com/ajiot_vpp/vpp-py-weather-data.git"
if __name__ == "__main__":
# repo = GitRepository(
# url=SOURCE_REPO,
# credentials={"access_token": Secret.load("gitee-access-token")}
# )
repo = SOURCE_REPO
flow.from_source(
source=repo,
entrypoint="prefect_task_list.py:task_schedule",
).deploy(
name="ods_weather_data_region_hourly-deployment",
work_pool_name="weather-data", # Work pool target
# parameters={"param_day": "", "method_key": ""},
cron="*/10 * * * *", # Cron schedule (every 10 minutes)
)
2.3、部署!
python deployment.py
三、Flow
flow是该项目的最小执行单元,也就是对应其具体要被执行的函数方法