SQLMesh系列教程-2:SQLMesh入门项目实战(下篇)

上篇我介绍了环境搭建、duckdb数据准备、sqlmesh数据模型、plan命令运行。本文继续介绍审计、测试、生成血缘关系以及python模型等。

在这里插入图片描述

有两种方法可以在SQLMesh中创建宏。一种方法是使用Python,另一种方法是使用Jinja。这里我们创建Python宏。让我们构建简单的Python宏。在宏文件夹下创建一个名为“ custom_calc.py ”的Python文件,并添加以下代码:

from sqlmesh import macro

@macro()
def multiply_by_10(evaluator, col):
    return col * 10

请注意,你必须为SQLMesh添加@macro装饰器,以便将其识别为宏并在模型中使用,还需要添加‘ evaluator ’作为它的函数参数之一。

让我们将这个宏添加到“example.intermediate.sql”模型中:

MODEL (
    name example.intermediate_model,
    owner tommy,
    kind FULL,
    cron '@daily',
    grain id,
    column_descriptions (
        id = 'primary key',
        letter = 'alphabet letter',
        value = 'random value',
        updated_date = 'updated date',
        new_col = 'a new column'
    )
  );

  SELECT
    id,
    letter,
    value,
    @multiply_by_10(value) AS big_value,
    updated_date,
    'new_col' AS new_col
  FROM
    example.base_model

我们在模型中添加了@multiply_by_10(value) AS big_value。“@”符号用于在SQL模型中引用宏。创建Python宏并将其添加到模型中就像刚才看到的一样简单。

使用Python宏,可以不受限于SQL所能做的事情,你可以为数据转换构建任何逻辑。

提示:在宏Python文件中,也可以创建其他函数,而不需要‘ @macro ’装饰器和‘ evaluator ’参数。这样可以更有效地使用这些函数来帮助组织宏函数中的代码和逻辑。

审计(Audits)

SQLMesh审计基本上是dbt测试。在SQLMesh中有内置审计,如‘ unique ’和‘ not_null ’。你还可以创建自己的自定义审计。

创建自定义审计

你应该创建一个SQL文件,其中包含审计文件夹下的自定义审计。

AUDIT (
    name assert_positive_ids,
  );

  SELECT *
  FROM @this_model
  WHERE
    id < 0

这是为了检查“id”列只包含正数。

向模型添加自定义审计

接下来,我们将这个自定义审计以及其他一些内置审计添加到“base_model.sql”:

将这些行添加到MODEL块中:

audits (
      assert_positive_ids,
      unique_values(columns = id),
      not_null(columns = id)
    )

完整代码如下:

MODEL (
    name example.base_model,
    owner Yuki,
    kind VIEW,
    cron '@daily',
    grain id,
    column_descriptions (
        id = 'primary key',
        letter = 'alphabet letter',
        value = 'random value',
        updated_date = 'updated date'
    ),
    audits (
      assert_positive_ids,
      unique_values(columns = id),
      not_null(columns = id)
    )
  );

  SELECT
    id::INT,
    letter::TEXT,
    value::INT,
    updated_date::DATE,
  FROM
    example.letters

运行的审计

SQLMesh使用‘ SQLMesh plan ’命令(在模型执行之后)自动运行审计。你也可以运行这个命令来只运行审计(你可能需要在运行这个命令之前应用一个计划):

sqlmesh audit

输出结果:

Found 3 audit(s).
assert_positive_ids on model example.base_model ✅ PASS.
unique_values on model example.base_model ✅ PASS.
not_null on model example.base_model ✅ PASS.

Finished with 0 audit errors and 0 audits skipped.
Done.

在SQLMesh中审计的一个好处是,默认情况下,如果审计失败,SQLMesh会停止管道的执行,以防止错误的数据继续执行。

测试

SQLMesh测试用于测试代码,而不是测试数据。我们只需要在yaml文件中为测试提供输入和预期输出。

创建测试

SQLMesh通过‘ SQLMesh create_test ’命令简化了这个过程。继续运行下面的代码,指定您想要为其创建测试的模型、它的上游模型和一个示例查询。

sqlmesh create_test example.intermediate_model --query example.base_model "SELECT * FROM example.base_model WHERE updated_date BETWEEN '2025-01-01' and '2025-01-15'" 

下面是上面的命令为我生成的测试文件,在tests目录下生成test_intermediate_model.yaml文件,内容如下:

test_intermediate_model:
  model: '"db"."example"."intermediate_model"'
  inputs:
    '"db"."example"."base_model"':
    - id: 1
      letter: A
      value: 10
      updated_date: 2025-01-07
    - id: 2
      letter: B
      value: 20
      updated_date: 2025-01-07
    - id: 3
      letter: C
      value: 30
      updated_date: 2025-01-07
  outputs:
    query:
    - id: 1
      letter: A
      value: 10
      big_value: 100
      updated_date: 2025-01-07
      new_col: new_col
    - id: 2
      letter: B
      value: 20
      big_value: 200
      updated_date: 2025-01-07
      new_col: new_col
    - id: 3
      letter: C
      value: 30
      big_value: 300
      updated_date: 2025-01-07
      new_col: new_col

运行测试

“ sqlmesh plan ”命令运行测试(在执行模型之前)以及“ sqlmesh test ”命令。让我们运行这两个命令:

sqlmesh plan dev

输出结果:

$ sqlmesh plan dev
======================================================================
Successfully Ran 1 tests against duckdb
----------------------------------------------------------------------

No changes to plan: project files match the `dev` environment
sqlmesh test

输出结果:

$ sqlmesh test
.
----------------------------------------------------------------------
Ran 1 test in 0.035s

OK

如果你还记得,我们为测试连接配置了一个不同的duckdb。这意味着该测试将使用测试连接“test.db”执行,而不是使用“db.db”。如果你想了解更多关于SQLMesh测试的知识,你会发现官方文档很有帮助!

血缘关系(DAG)

许多数据工具的典型特性是能够可视化数据血缘关系。SQLMesh也可以通过CLI或SQLMesh UI提供这种功能。使用CLI,您可以运行‘ sqlmesh dag file_name ’来生成简单的数据血缘关系。

sqlmesh dag dag.html

在这里插入图片描述

嗯,这是相当有限的。您只能看到整体的数据流,这对你来说可能不够。当你希望看到更详细的日期时,需要使用SQLMesh UI。为此,你必须安装一个依赖项:

pip install 'sqlmesh[web]'

然后运行以下命令在浏览器中打开UI:

sqlmesh ui

输出内容:

$ sqlmesh ui
INFO:     Started server process [465829]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)

这里提示你打开UI,你将看到一个漂亮的UI,如下所示(如果你没有看到血缘关系,你可能需要展开该区域或单击左侧的模型文件),如:点击了“base_model.sql”):
在这里插入图片描述

要查看列级血缘关系吗?你可以点击列。例如,让我们点击example.intermediate_model中的new_col、big_value和value列:

在这里插入图片描述

SQLMesh显示了‘ value ’和‘ big_value ’是如何来自上游表中的‘ value ’的。然而,new_col在我点击后变成了灰色。

当涉及到SQLMesh UI所提供的功能时,数据沿袭只是冰山一角。如果您选择这样做,您可以从这个UI执行所有的开发任务。既然我们在这里,让我们看看如何找到模型元数据/定义。

点击左侧的“数据目录”图标。它把你带到一个视图中,你可以看到模型元数据,包括模型和列描述:

在这里插入图片描述

Python模型

我知道你们很多人都是狂热的 Python 爱好者,有些事情用Python可以做得更好。虽然你需要在Python模型中返回pandas或Spark数据框架,但我将使用polar进行转换部分,因为它的整体实用性,如速度和干净的API:

“example.intermediate_py_model.py”:

import typing as t
from datetime import datetime

from sqlmesh import ExecutionContext, model
import pandas as pd
import polars as pl


@model(
    name="example.intermediate_py_model",
    owner="Yuki",
    kind="FULL",
    cron="@daily",
    grain="id",
    columns={
        "id": "int",
        "letter": "text",
        "value": "int",
        "big_value": "int",
        "updated_date": "date",
        "new_col": "text",
    },
    column_descriptions={
        "id": "primary key",
        "letter": "alphabet letter",
        "value": "random value",
        "big_value": "value * 10",
        "updated_date": "updated date",
        "new_col": "a new column",
    },
)
def execute(
    context: ExecutionContext,
    start: datetime,
    end: datetime,
    execution_time: datetime,
    **kwargs: t.Any,
) -> pd.DataFrame:
    
    table = context.resolve_table("example.base_model")
    df = (
        pl.from_pandas(context.fetchdf(f"SELECT * FROM {table}"))
        .select(
            "id",
            "letter",
            "value",
            pl.col("value").mul(10).alias("big_value"),
            "updated_date",
            pl.lit("new_col").alias("new_col"),
        )
    )

    return df.to_pandas()

注意事项:

  • 整体结构是相同的,包括模型属性。
  • Python模型要求您返回pandas或Spark数据框架。
  • Python模型需要指定列模式。
  • 使用‘ ExecutionContext ’是Python模型中的一种典型方法。它提供了对上游表、全局变量等的访问。
  • 我没有在这个Python模型中使用宏函数来计算big_value列。原因是我们在Python模型中引用自定义Python宏的方式有点麻烦(在撰写本文时)。如果你愿意,你可以这样做:
  1. 导入宏函数就像在Python模型中导入Python函数一样。

  2. 在‘ MacroEvaluator ’类中导入,并将其作为参数传递给宏函数。

图
在这里插入图片描述

  1. 或者你将你的函数定义为一个通用的Python函数,没有‘ @macro ’装饰器和‘ evaluator ’参数,这样你就不需要导入和传入‘ MacroEvaluator ’类(Tobiko Slack线程)。

SQLMesh中的Python模型非常灵活,因为只要它们返回pandas或Spark数据框架,你就可以做几乎任何事情。如果愿意,你甚至可以在SQLMesh中将数据摄取作业构建为Python模型。

最后总结

显然,本文无法涵盖SQLMesh所提供的所有内容。未来我们继续学习下面列出的相关内容:

  • 深入了解模型类型/种类
  • Pre/post语句
  • 有用的CLI命令(table_dff, sqlmesh evaluate等)
  • 开源Github Actions CI/CD Bot

SQLMesh是一个令人兴奋且不断发展的工具。我将继续与大家分享我的见解。如果有任何关于SQLMesh或其他工具的具体内容,请随时告诉我。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/967787.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

自主项目面试点总结

1、许苑–OJ判题系统 技术栈&#xff1a;Spring BootSpring Cloud AlibabaRedisMybatisMQDocker 项目地址: https://github.com/xuyuan-upward/xyoj-backend-microservice 1.1、项目介绍: 一个基于微服务的OJ系统&#xff0c;具备能够根据管理员预设的题目用例对用户提交的代…

Macbook Pro快速搭建Easysearch学习环境

在学习过程中&#xff0c;我们有时身边没有可用的服务器&#xff0c;这时就需要借助自己的 Mac 来安装和学习 Easysearch。然而&#xff0c;Easysearch 官网并未提供 Mac 版本的安装教程&#xff0c;下面我将详细整理我在 Mac 上安装和使用 Easysearch 的折腾经历。 Easysearc…

Arduino 第十三章:红外接收

Arduino 第十三章&#xff1a;红外接收 一、红外接收概述 红外接收在日常生活和电子制作中十分常见&#xff0c;像电视、空调等家电的遥控器就是利用红外信号来实现远程控制的。在 Arduino 项目里&#xff0c;借助红外接收模块能够让设备接收红外信号&#xff0c;进而实现诸如…

朝天椒USB服务器:解决加密狗远程连接

本文探讨朝天椒USB服务器用Usb Over Network技术&#xff0c;解决加密狗在虚拟机、云主机甚至异地的远程连接问题。 在企业数字化转型的浪潮中&#xff0c;加密狗作为防止软件盗版的重要手段&#xff0c;广泛应用于各类软件授权场景。然而&#xff0c;随着企业超融合进程不断加…

第二篇:电压与电流的“锡安之战”——电路定律在800V高压平台中的应用

——基尔霍夫与戴维南如何破解新能源汽车的“高压密码” 核心隐喻&#xff1a;电路定律的“数字起义” 在《黑客帝国》中&#xff0c;锡安的反抗军通过破解母体协议实现逆袭。而在新能源汽车的800V高压平台中&#xff0c; 基尔霍夫定律 和 戴维南定理 正是工程师手中的“通…

【牛客】动态规划专题一:斐波那契数列

文章目录 DP1 斐波那契数列法1&#xff1a;递归法2&#xff1a;动态规划法3&#xff1a;优化空间复杂度 2.分割连接字符串3. 给定一个字符串s和一组单词dict&#xff0c;在s中添加空格将s变成一个句子 DP1 斐波那契数列 法1&#xff1a;递归 // 递归 #include <iostream>…

innovus如何分步长func和dft时钟

在Innovus工具中&#xff0c;分步处理功能时钟&#xff08;func clock&#xff09;和DFT时钟&#xff08;如扫描测试时钟&#xff09;需要结合设计模式&#xff08;Function Mode和DFT Mode&#xff09;进行约束定义、时钟树综合&#xff08;CTS&#xff09;和时序分析。跟随分…

5-R循环

R 循环 ​ 有的时候&#xff0c;我们可能需要多次执行同一块代码。一般情况下&#xff0c;语句是按顺序执行的&#xff1a;函数中的第一个语句先执行&#xff0c;接着是第二个语句&#xff0c;依此类推。 编程语言提供了更为复杂执行路径的多种控制结构。 循环语句允许我们多…

DeepSeek AI R1推理大模型API集成文档

DeepSeek AI R1推理大模型API集成文档 引言 随着自然语言处理技术的飞速发展&#xff0c;大语言模型在各行各业的应用日益广泛。DeepSeek R1作为一款高性能、开源的大语言模型&#xff0c;凭借其强大的文本生成能力、高效的推理性能和灵活的接口设计&#xff0c;吸引了大量开发…

知识图谱_protege的安装

目录 1.下载protege 2.安装可视化工具Graphviz 3.配置 参考【知识图谱】3.Protege下载安装-CSDN博客 1.下载protege 我在官网下载不了所以我就没有在官网下载 项目首页 - Protege-5.5.0Windows版本快速下载指南:Protege是一个广受欢迎的、强大的知识建模工具&#xff0c;用…

从BERT到ChatGPT:大模型训练中的存储系统挑战与技术发展——论文泛读

计算机研究与发展 2024 Paper 论文阅读笔记整理 问题 以ChatGPT为代表的大模型在文字生成、语义理解等任务上表现卓越&#xff0c;但大模型的参数量在3年内增长数万倍&#xff0c;且仍呈现增长的趋势。大模型训练面临存储挑战&#xff0c;存储需求大&#xff0c;且具有独特的…

船舶维保管理系统

一、项目介绍 381.基于SpringBoot的船舶维保管理系统&#xff0c;系统包含四种角色&#xff1a;管理员、船家、维保人员、维保公司,系统分为前台和后台两大模块&#xff0c;主要功能如下。 船家&#xff1a; - 个人中心&#xff1a;管理个人信息。 - 公告管理&#xff1a;查看…

【详细版】DETR系列之Deformable DETR(2021 ICLR)

论文标题Deformable DETR: Deformable Transformers for End-to-End Object Detection论文作者Xizhou Zhu, Weijie Su, Lewei Lu, Bin Li, Xiaogang Wang, Jifeng Dai发表日期2021年03月01日GB引用> Xizhou Zhu, Weijie Su, Lewei Lu, et al. Deformable DETR: Deformable T…

从云原生到 AI 原生,谈谈我经历的网关发展历程和趋势

作者&#xff1a;谢吉宝&#xff08;唐三&#xff09; 编者按&#xff1a; 云原生 API 网关系列教程即将推出&#xff0c;欢迎文末查看教程内容。本文整理自阿里云智能集团资深技术专家&#xff0c;云原生产品线中间件负责人谢吉宝&#xff08;唐三&#xff09; 在云栖大会的精…

基于机器学习时序库pmdarima实现时序预测

目录 一、Pmdarima实现单变量序列预测1.1 核心功能与特性1.2 技术优势对比1.3 python案例1.3.1 时间序列交叉验证1.3.1.1 滚动交叉验证1.3.1.2 滑窗交叉验证 时间序列相关参考文章&#xff1a; 时间序列预测算法—ARIMA 基于VARMAX模型的多变量时序数据预测 基于机器学习时序库…

【文本处理】如何在批量WORD和txt文本提取手机号码,固话号码,提取邮箱,删除中文,删除英文,提取车牌号等等一些文本提取固定格式的操作,基于WPF的解决方案

企业的应用场景 数据清洗&#xff1a;在进行数据导入或分析之前&#xff0c;往往需要对大量文本数据进行预处理&#xff0c;比如去除文本中的无关字符&#xff08;中文、英文&#xff09;&#xff0c;只保留需要的联系信息&#xff08;手机号码、固话号码、邮箱&#xff09;。…

小游戏源码开发之可跨app软件对接是如何设计和开发的

专业小游戏开发的团队往往会面临跨领域和不同平台客户需要追加同一款游戏的需求&#xff0c;所以就要设计和开发一款可任意对接不同 App 软件的小游戏&#xff0c;那么针对这类需求小游戏开发团队早已有了成熟的解决方案&#xff0c;针对设计和开发可跨平台游戏对接大概流程简单…

C# Winform 使用委托实现C++中回调函数的功能

C# Winform 使用委托实现C中回调函数的功能 在项目中遇到了使用C#调用C封装的接口&#xff0c;其中C接口有一个回调函数的参数。参考对比后&#xff0c;在C#中是使用委托(delegate)来实现类似的功能。 下面使用一个示例来介绍具体的使用方式&#xff1a; 第一步&#xff1a;…

从基础到人脸识别与目标检测

前言 从本文开始&#xff0c;我们将开始学习ROS机器视觉处理&#xff0c;刚开始先学习一部分外围的知识&#xff0c;为后续的人脸识别、目标跟踪和YOLOV5目标检测做准备工作。我采用的笔记本是联想拯救者游戏本&#xff0c;系统采用Ubuntu20.04&#xff0c;ROS采用noetic。 颜…

未来替代手机的产品,而非手机的本身

替代手机的产品包括以下几种&#xff1a; 可穿戴设备&#xff1a;智能手表、智能眼镜等可穿戴设备可以提供类似手机的功能&#xff0c;如通话、信息推送、浏览网页等。 虚拟现实&#xff08;VR&#xff09;技术&#xff1a;通过佩戴VR头显&#xff0c;用户可以进行语音通话、发…