Apache Doris 基于 Job Scheduler 实现秒级触发任务调度能力

作者|SelectDB 技术团队

在数据管理愈加精细化的需求背景下,定时调度在其中扮演着重要的角色。它通常被应用于以下场景:

  • 定期数据更新,如周期性数据导入和 ETL 操作,减少人工干预,提高数据处理的效率和准确性。
  • 结合 Catalog 实现外部数据源数据定期同步,确保多源数据高效、准确的整合到目标系统中,满足复杂的业务分析需求。
  • 定期清理过期/无效数据,释放存储空间,避免过多过期/无效数据对系统性能产生影响。

在 Apache Doris 之前版本中,通常需要依赖于外部调度系统,如通过业务代码定时调度或者引入第三方调度工具、分布式调度平台来满足上述需求。然而,因受限于外部系统自身能力,可能无法满足 Doris 对调度策略及资源管理灵活性的要求。此外,如果外部调度系统出现故障,这不仅会增加业务风险,还需投入额外的运维时间和人力来应对。

引入 Job Scheduler

为解决上述问题,Apache Doris 在 2.1 版本中引入了 Job Scheduler 功能,实现了自主任务调度能力,调度的精准度可达到秒级。该功能的推出不仅保障了数据导入的完整性和一致性,更让用户能够灵活、便捷调整调度策略。同时,因减少了对外部系统的依赖,也降低了系统故障的风险和运维成本,为社区用户带来更加统一、可靠的使用体验。

Doris Job Scheduler 是一种基于预设计划运行的任务管理系统,能够在特定时间点或按照指定时间间隔触发预定义操作,实现任务的自动化执行。Job Scheduler 具备以下特点:

  • 高效调度:Job Scheduler 可以在指定的时间间隔内安排任务和事件,确保数据处理的高效性。采用时间轮算法保证事件能够精准做到秒级触发。
  • 灵活调度:Job Scheduler 提供了多种调度选项,如按 分、小时、天或周的间隔进行调度,同时支持一次性调度以及循环(周期)事件调度,并且周期调度也可以指定开始时间、结束时间。
  • 事件池和高性能处理队列:Job Scheduler 采用 Disruptor 实现高性能的生产消费者模型,最大可能的避免任务执行过载。
  • 调度记录可追溯:Job Scheduler 会存储最新的 Task 执行记录(可配置),通过简单的命令即可查看任务执行记录,确保过程可追溯。
  • 高可用:依托于 Doris 自身的高可用机制,Job Schedule 可以很轻松的做到自恢复、高可用。

(具体实现原理可参考本文“设计与实现”章节介绍)

语法及示例

01 语法说明

一条有效的 Job 语句需包含以下内容:

  • 关键字 CREATE JOB 需加作业名称,它在数据库中标识唯一事件。
  • ON SCHEDULE 子句用于指定 Job 作业的类型、触发时间和频率。
    • AT timestamp 用于一次性事件。它指定 JOB 仅在给定的日期和时间执行一次,AT CURRENT_TIMESTAMP 指定当前日期和时间。因 JOB 一旦创建则会立即运行,也可用于异步任务创建。
    • EVERY:用于周期性作业,可指定作业的执行频率,关键字后需指定时间间隔(周、天、小时、分钟)。
      • Interval:用于指定作业执行频率。1 DAY 表示每天执行一次,1 HOUR表示每小时执行一次,1 MINUTE 表示每分钟执行一次,1 WEEK 表示每周执行一次。
      • 子句EVERY包含可选 STARTS子句。STARTS后面为 timestamp 值,该值用于定义开始重复的时间,CURRENT_TIMESTAMP 用于指定当前日期和时间。JOB 一旦创建则会立即运行。
      • 子句EVERY包含可选 ENDS子句。ENDS 关键字后面为***timestamp*** 值,该值定义 JOB 事件停止运行的时间。
  • DO 子句用于指定 Job 作业触发时所需执行的操作,目前仅支持 Insert 语句。
CREATE
    JOB
    job_name
    ON SCHEDULE schedule
    [COMMENT 'string']
    DO execute_sql;

schedule: {
    AT timestamp 
   | EVERY interval
    [STARTS timestamp ]
    [ENDS timestamp ]
}

interval:
    quantity { WEEK |DAY | HOUR | MINUTE
             }
            


下方为简单的示例:

CREATE JOB my_job ON SCHEDULE EVERY 1 MINUTE DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;


该语句表示创建一个名为 my_job的作业,每分钟执行一次,执行的操作是将 db2.tbl2 中的数据导入到 db1.tbl1中。

02 举例说明

创建一次性的 Job: 在 2025-01-01 00:00:00 时执行一次,将 db2.tbl2中数据导入到 db1.tbl1 中。

CREATE JOB my_job ON SCHEDULE AT '2025-01-01 00:00:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;


创建周期性的 Job,未指定结束时间: 在 22025-01-01 00:00:00 时开始每天执行 1 次,将 db2.tbl2 中数据导入到 db1.tbl1 中。

CREATE JOB my_job ON SCHEDULE EVERY 1 DAY STARTS '2025-01-01 00:00:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2 WHERE  create_time >=  days_add(now(),-1);


创建周期性的 Job,指定结束时间: 在 2025-01-01 00:00:00 时开始每天执行 1 次,将 db2.tbl2 中的数据导入到 db1.tbl1 中,在 2026-01-01 00:10:00 时结束。

CREATE JOB my_job ON SCHEDULER EVERY 1 DAY STARTS '2025-01-01 00:00:00' ENDS '2026-01-01 00:10:00' DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2 create_time >=  days_add(now(),-1);


借助 Job 实现异步执行: 由于 Job 在 Doris 中是以同步任务的形式创建的,但其执行过程却是异步进行的,这一特性使得 Job 非常适合用于实现异步任务,例如常见的 insert into select 任务。

假设需要将db2.tbl2 中的数据导入到 db1.tbl1 中,这里只需要指定 JOB 为一次性任务,且开始时间设置为当前时间即可。

CREATE JOB my_job ON SCHEDULE AT current_timestamp DO INSERT INTO db1.tbl1 SELECT * FROM db2.tbl2;


基于 Catalog 与 Job Scheduler 的数据自动同步

以某电商场景为例,用户常常需要从 MySQL 中提取业务数据,并将这些数据同步到 Doris 中进行数据分析,从而支持精准的营销活动。而 Job Scheduler 可与数据湖能力 Multi Catalog 配合,高效完成跨数据源的定期数据同步

基于 Catalog 与 Job Scheduler 的数据自动同步.png

以上表为例,用户希望查询符合_总消费金额、最后一次访问时间、性别、所在城市_这几个数值条件的用户,并将满足条件的用户信息导入到 Doris 中,以便后续的定向推送。

1. 首先,创建一张 Doris 表

CREATE TABLE IF NOT EXISTS user_activity
(
    `user_id` LARGEINT NOT NULL COMMENT "用户id",
    `date` DATE NOT NULL COMMENT "数据灌入日期时间",
    `city` VARCHAR(20) COMMENT "用户所在城市",
    `age` SMALLINT COMMENT "用户年龄",
    `sex` TINYINT COMMENT "用户性别",
    `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
    `cost` BIGINT SUM DEFAULT "0" COMMENT "用户总消费",
    `max_dwell_time` INT MAX DEFAULT "0" COMMENT "用户最大停留时间",
    `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "用户最小停留时间"
)
AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);


2. 其次,创建对应 MySQL 库的 Catalog

CREATE CATALOG activity PROPERTIES (
    "type"="jdbc",
    "user"="root",
    "jdbc_url" = "jdbc:mysql://127.0.0.1:9734/user?useSSL=false",
    "driver_url" = "mysql-connector-java-5.1.49.jar",
    "driver_class" = "com.mysql.jdbc.Driver"
);


3. 最后,将 MySQL 数据导入到 Doris 中。采用 Catalog + Insert Into 的方式来导入全量数据,由于全量导入操作可能会引发系统服务波动,通常选择在业务闲暇时进行操作。

  • 一次性调度: 如下方代码所示,使用一次性任务来定时触发全量导入任务,触发时间为凌晨 3:00。
CREATE JOB one_time_load_job
ON SCHEDULE 
AT '2024-8-10 03:00:00'
DO
 INSERT INTO user_activity FROM SELECT * FROM activity.user.activity 
 


  • 周期调度: 用户也可以创建一个周期性的调度任务,定期更新最新的数据。
CREATE JOB schedule_load
ON SCHEDULE EVERY 1 DAY
DO
 INSERT INTO user_activity FROM SELECT * FROM activity.user.activity where create_time >=  days_add(now(),-1)


设计与实现

高效的调度通常伴随着大量的资源消耗,高精度的调度更是如此。传统的实现方式是直接使用 Java 内置的定时调度能力——定时调度线程周期访问,或采用一些定时调度的工具类库,但其在精度以及内存占用上存在较大的问题。为更好保障性能的前提下降低资源的占用,我们选择 TimingWheel 算法与 Disruptor 结合,实现秒级别的任务调度。

设计与实现

具体来说,利用 Netty 的 HashedWheelTimer 实现时间轮算法,Job Manager 会周期性(默认十分钟)地将未来事件放入时间轮中调度。为了保证任务高效触发并避免资源过度占用,采用 Disruptor 构建单生产者多消费者模型。时间轮仅负责触发,并不直接执行任务。对于到期需触发的任务时,会将其放入 Diapatch 线程,由其负责将任务分发至相应的执行线程池,对于需立即执行的任务,则直接将其投递至相应的任务执行线程池中。

对于单次执行事件,将在调度完成后删除事件定义;对于周期性事件,时间轮中的系统事件将定期拉取下一个周期的执行任务。这样可以避免大量任务集中在一个 Bucket 中,减少无意义的遍历、提高处理效率。

而对于事务型任务,Job Scheduler 能够通过与事务的强关联以及事务回调机制,确保事务型任务的执行结果与预期一致,从而保证数据的完整性和一致性。

结束语

Doris Job Scheduler 是一款强大且灵活的任务调度工具,是数据处理中必不可少的功能之一。除了在数据湖分析、内部 ETL 等常见场景的应用外,Job Scheduler 对于异步物化视图的实现也起到关键的作用。异步物化视图是一个预先计算并存储的结果集,其数据更新的频率与源表的变动紧密相关。当源表数据更新频繁时,为确保物化视图中数据保持最新状态,就需要对物化视图定期刷新。因此在 2.1 版本中,我们巧妙地利用 JOB 定时调度功能,保障了物化视图与源表数据的一致性,大幅降低了人工干预的成本。

未来,Doris Job Scheduler 还会支持以下特性:

  • 支持通过 UI 界面查看不同时段执行的任务分布情况。
  • 支持 JOB 流程编排,即 DAG JOB。这意味着我们可以在内部实现数仓任务编排,与 Catalog 功能叠加将会更高效地完成数据处理和分析工作。
  • 支持对导入任务、UPDATE、DELETE 操作进行定时调度。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/529875.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Java基于微信小程序的校园外卖平台设计与实现,附源码

博主介绍:✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专栏推荐订阅👇…

03-JAVA设计模式-代理模式详解

代理模式 什么是代理模式 Java代理模式是一种常用的设计模式,主要用于在不修改现有类代码的情况下,为该类添加一些新的功能或行为。代理模式涉及到一个代理类和一个被代理类(也称为目标对象)。代理类负责控制对目标对象的访问&a…

Github 2024-04-09 Python开源项目日报 Top10

根据Github Trendings的统计,今日(2024-04-09统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Python项目10Vue项目1JavaScript项目1系统设计指南 创建周期:2507 天开发语言:Python协议类型:OtherStar数量:241693 个Fork数量:42010 次…

贪心算法|860.柠檬水找零

力扣题目链接 class Solution { public:bool lemonadeChange(vector<int>& bills) {int five 0, ten 0, twenty 0;for (int bill : bills) {// 情况一if (bill 5) five;// 情况二if (bill 10) {if (five < 0) return false;ten;five--;}// 情况三if (bill …

SpringBoot快速入门笔记(6)

文章目录 Axios网络请求1、简介2、导入3、网络请求4、跨域问题5、数据渲染6、全局配置 Axios网络请求 1、简介 项目开发中&#xff0c;前端页面需要的数据往往要从服务器端获取&#xff0c;这必然涉及到和服务器的通信 Axios基于promise网络请求库&#xff0c;作用于node.js和…

Apache Incubator Answer 本地开发部署

文章目录 简介Github文档插件部署 Answer开发环境编译项目初始化项目运行项目 简介 一款适合任何团队的问答平台软件。 Apache Incubator Answer是一个开源项目&#xff0c;它是一个用于构建和部署问答系统的框架。该项目是Apache软件基金会的孵化器项目&#xff0c;提供一个…

PVE下安装配置openwrt和ikuai

开端 openwrt 和 ikuai 是比较出名的软路由系统。我最早接触软路由还是因为我的一个学长要改自己家里的网络&#xff0c;使用软路由去控制网络。我听说后便来了兴致&#xff0c;也在我家搞了一套软路由系统。现在我已经做完了&#xff0c;就想着写个文章记录一下。 软路由简介…

云数据库价格一瞥(华为云、百度智能云、腾讯云、阿里云)

最近&#xff0c;大家似乎和价格“磕”上了。本文仅考虑主流产品&#xff08; RDS MySQL、Redis &#xff09;的部分主流规格&#xff0c;对各家厂商的价格做一个对比&#xff0c;供参考。 TL;DR&#xff1a; 总体来看&#xff0c;各家云厂商价格趋于持平&#xff0c;部分主流商…

关于阿里云centos系统下宝塔面板部署django/中pip install mysqlclient失败问题的大总结/阿里云使用oss长期访问凭证

python版本3.12.0 问题1 解决方案 sudo vim /etc/profile export MYSQLCLIENT_CFLAGS"-I/usr/include/mysql" export MYSQLCLIENT_LDFLAGS"-L/usr/lib64/mysql" Esc退出编辑模式 &#xff1a;wq退出并且保存 问题二 说是找不到 mysql.h头文件 CentOS ‘…

【数据结构】考研真题攻克与重点知识点剖析 - 第 8 篇:排序

前言 本文基础知识部分来自于b站&#xff1a;分享笔记的好人儿的思维导图与王道考研课程&#xff0c;感谢大佬的开源精神&#xff0c;习题来自老师划的重点以及考研真题。此前我尝试了完全使用Python或是结合大语言模型对考研真题进行数据清洗与可视化分析&#xff0c;本人技术…

Python单元测试pytest捕获日志输出

使用pytest进行单元测试时&#xff0c;遇到了需要测试日志输出的情况&#xff0c;查看了文档 https://docs.pytest.org/en/latest/how-to/capture-stdout-stderr.html https://docs.pytest.org/en/latest/how-to/logging.html 然后试了一下&#xff0c;捕捉logger.info可以用…

【Qt】网络

目录 一、Udp Socket 二、Tcp Socket 三、HTTP 在进行网络编程之前&#xff0c;需要在项目中的 .pro 文件中添加 network 模块 有时添加之后要手动编译一下项目&#xff0c;使 Qt Creator 能够加载对应模块的头文件 一、Udp Socket 主要的类有两个&#xff1a;QUdpSocket …

Octopus:2B 参数语言模型即可媲美 GPT-4 的函数调用性能

近年来&#xff0c;大语言模型在 PC、智能手机和可穿戴设备的操作系统中应用逐渐成为趋势。 例如&#xff0c;MultiOn (Garg, 2024) 和 Adept AI (Luan, 2024) 等 AI 助理工具&#xff0c;以及 Rabbit R1 (Lyu, 2024) 和 Humane AI Pin (Chaudhri, 2024) 等 AI 消费产品在消费者…

Mac 装 虚拟机 vmware、centos7等,21年网络安全面经分享

链接: https://pan.baidu.com/s/1oZw1cLyl6Uo3lAD2_FqfEw?pwdzjt4 提取码: zjt4 复制这段内容后打开百度网盘手机App&#xff0c;操作更方便哦 centos8 链接: https://pan.baidu.com/s/10KWpCUa2JkwcjYlJZVogKQ?pwdn99a 提取码: n99a 复制这段内容后打开百度网盘手机App&…

电脑总是蓝屏怎么办,电脑总是蓝屏怎么办开不了机

工作离不开电脑&#xff0c;不过电脑经常会出现一些故障让我们崩溃不已&#xff0c;尤其类似电脑蓝屏开不了机这种&#xff0c;总是突然发生&#xff0c;让人猝不及防。那么面对这一情况&#xff0c;相信很多人都是不知道该如何处理的。这里提供两个方法&#xff0c;第一种方法…

网络安全---RSA公钥加密与签名

实验项目&#xff1a;RSA公钥加密与签名实验 1.实验目的 本实验的学习目标是让学生获得 RSA 算法的动手经验。 通过课堂学习&#xff0c;学生应该已经了解 RSA 算法的理论部分&#xff0c; 知道在数学上如何生成公钥、私钥以及如何执行加密、解密和签名生成、验证。 通过使用…

防SSL证书泄露服务器IP教程

在Web CDN&#xff08;内容分发网络&#xff09;中&#xff0c;防止SSL泄露源服务器IP是一个重要的安全考虑。下面是一些建议的方法来实现这一目标&#xff1a; 首先呢&#xff0c;我们隐藏服务器IP不要使用服务器IP生成的SSL证书&#xff0c;不然会泄露我们的服务器IP。 泄露了…

【BEV 视图变换】Fast-Ray 基于查找表LUT、多视角到单个三维体素转换

前言 在BEV感知方案中&#xff0c;将图像特征转为BEV特征&#xff0c;是关键的一步&#xff0c;这过程也称为2D视图变换。 本文介绍Fast-Ray方法&#xff0c;在Fast-BEV中被提出的&#xff0c;它是一种轻量级并且易于部署的视图转换方法&#xff0c;用于快速推理。 通过将多…

.net 6 集成NLog

.net 6 webapi项目集成NLog 上代码step 1 添加nugetstep 2 添加支持step 3 添加配置文件 结束 上代码 step 1 添加nuget 添加nuget 包 Roc step 2 添加支持 修改program.cs var builder WebApplication.CreateBuilder(args); // 添加NLog日志支持 builder.AddRocNLog();ste…

java中static关键字(尚未完善)

文章目录 static关键字static可修饰static方法举例static代码块拓展其他链接 static关键字 加载顺序类是构建对象的模板&#xff0c;一个类多个对象static修饰的方法或者变量都属于类&#xff0c;类独有的 static可修饰 修饰变量&#xff08;属于类变量&#xff0c;被创建出来…