上篇我介绍了环境搭建、duckdb数据准备、sqlmesh数据模型、plan命令运行。本文继续介绍审计、测试、生成血缘关系以及python模型等。
有两种方法可以在SQLMesh中创建宏。一种方法是使用Python,另一种方法是使用Jinja。这里我们创建Python宏。让我们构建简单的Python宏。在宏文件夹下创建一个名为“ custom_calc.py ”的Python文件,并添加以下代码:
from sqlmesh import macro
@macro()
def multiply_by_10(evaluator, col):
return col * 10
请注意,你必须为SQLMesh添加@macro装饰器,以便将其识别为宏并在模型中使用,还需要添加‘ evaluator ’作为它的函数参数之一。
让我们将这个宏添加到“example.intermediate.sql”模型中:
MODEL (
name example.intermediate_model,
owner tommy,
kind FULL,
cron '@daily',
grain id,
column_descriptions (
id = 'primary key',
letter = 'alphabet letter',
value = 'random value',
updated_date = 'updated date',
new_col = 'a new column'
)
);
SELECT
id,
letter,
value,
@multiply_by_10(value) AS big_value,
updated_date,
'new_col' AS new_col
FROM
example.base_model
我们在模型中添加了@multiply_by_10(value) AS big_value。“@”符号用于在SQL模型中引用宏。创建Python宏并将其添加到模型中就像刚才看到的一样简单。
使用Python宏,可以不受限于SQL所能做的事情,你可以为数据转换构建任何逻辑。
提示:在宏Python文件中,也可以创建其他函数,而不需要‘ @macro ’装饰器和‘ evaluator ’参数。这样可以更有效地使用这些函数来帮助组织宏函数中的代码和逻辑。
审计(Audits)
SQLMesh审计基本上是dbt测试。在SQLMesh中有内置审计,如‘ unique ’和‘ not_null ’。你还可以创建自己的自定义审计。
创建自定义审计
你应该创建一个SQL文件,其中包含审计文件夹下的自定义审计。
AUDIT (
name assert_positive_ids,
);
SELECT *
FROM @this_model
WHERE
id < 0
这是为了检查“id”列只包含正数。
向模型添加自定义审计
接下来,我们将这个自定义审计以及其他一些内置审计添加到“base_model.sql”:
将这些行添加到MODEL块中:
audits (
assert_positive_ids,
unique_values(columns = id),
not_null(columns = id)
)
完整代码如下:
MODEL (
name example.base_model,
owner Yuki,
kind VIEW,
cron '@daily',
grain id,
column_descriptions (
id = 'primary key',
letter = 'alphabet letter',
value = 'random value',
updated_date = 'updated date'
),
audits (
assert_positive_ids,
unique_values(columns = id),
not_null(columns = id)
)
);
SELECT
id::INT,
letter::TEXT,
value::INT,
updated_date::DATE,
FROM
example.letters
运行的审计
SQLMesh使用‘ SQLMesh plan ’命令(在模型执行之后)自动运行审计。你也可以运行这个命令来只运行审计(你可能需要在运行这个命令之前应用一个计划):
sqlmesh audit
输出结果:
Found 3 audit(s).
assert_positive_ids on model example.base_model ✅ PASS.
unique_values on model example.base_model ✅ PASS.
not_null on model example.base_model ✅ PASS.
Finished with 0 audit errors and 0 audits skipped.
Done.
在SQLMesh中审计的一个好处是,默认情况下,如果审计失败,SQLMesh会停止管道的执行,以防止错误的数据继续执行。
测试
SQLMesh测试用于测试代码,而不是测试数据。我们只需要在yaml文件中为测试提供输入和预期输出。
创建测试
SQLMesh通过‘ SQLMesh create_test ’命令简化了这个过程。继续运行下面的代码,指定您想要为其创建测试的模型、它的上游模型和一个示例查询。
sqlmesh create_test example.intermediate_model --query example.base_model "SELECT * FROM example.base_model WHERE updated_date BETWEEN '2025-01-01' and '2025-01-15'"
下面是上面的命令为我生成的测试文件,在tests目录下生成test_intermediate_model.yaml文件,内容如下:
test_intermediate_model:
model: '"db"."example"."intermediate_model"'
inputs:
'"db"."example"."base_model"':
- id: 1
letter: A
value: 10
updated_date: 2025-01-07
- id: 2
letter: B
value: 20
updated_date: 2025-01-07
- id: 3
letter: C
value: 30
updated_date: 2025-01-07
outputs:
query:
- id: 1
letter: A
value: 10
big_value: 100
updated_date: 2025-01-07
new_col: new_col
- id: 2
letter: B
value: 20
big_value: 200
updated_date: 2025-01-07
new_col: new_col
- id: 3
letter: C
value: 30
big_value: 300
updated_date: 2025-01-07
new_col: new_col
运行测试
“ sqlmesh plan ”命令运行测试(在执行模型之前)以及“ sqlmesh test ”命令。让我们运行这两个命令:
sqlmesh plan dev
输出结果:
$ sqlmesh plan dev
======================================================================
Successfully Ran 1 tests against duckdb
----------------------------------------------------------------------
No changes to plan: project files match the `dev` environment
sqlmesh test
输出结果:
$ sqlmesh test
.
----------------------------------------------------------------------
Ran 1 test in 0.035s
OK
如果你还记得,我们为测试连接配置了一个不同的duckdb。这意味着该测试将使用测试连接“test.db”执行,而不是使用“db.db”。如果你想了解更多关于SQLMesh测试的知识,你会发现官方文档很有帮助!
血缘关系(DAG)
许多数据工具的典型特性是能够可视化数据血缘关系。SQLMesh也可以通过CLI或SQLMesh UI提供这种功能。使用CLI,您可以运行‘ sqlmesh dag file_name ’来生成简单的数据血缘关系。
sqlmesh dag dag.html
嗯,这是相当有限的。您只能看到整体的数据流,这对你来说可能不够。当你希望看到更详细的日期时,需要使用SQLMesh UI。为此,你必须安装一个依赖项:
pip install 'sqlmesh[web]'
然后运行以下命令在浏览器中打开UI:
sqlmesh ui
输出内容:
$ sqlmesh ui
INFO: Started server process [465829]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
这里提示你打开UI,你将看到一个漂亮的UI,如下所示(如果你没有看到血缘关系,你可能需要展开该区域或单击左侧的模型文件),如:点击了“base_model.sql”):
要查看列级血缘关系吗?你可以点击列。例如,让我们点击example.intermediate_model中的new_col、big_value和value列:
SQLMesh显示了‘ value ’和‘ big_value ’是如何来自上游表中的‘ value ’的。然而,new_col在我点击后变成了灰色。
当涉及到SQLMesh UI所提供的功能时,数据沿袭只是冰山一角。如果您选择这样做,您可以从这个UI执行所有的开发任务。既然我们在这里,让我们看看如何找到模型元数据/定义。
点击左侧的“数据目录”图标。它把你带到一个视图中,你可以看到模型元数据,包括模型和列描述:
Python模型
我知道你们很多人都是狂热的 Python 爱好者,有些事情用Python可以做得更好。虽然你需要在Python模型中返回pandas或Spark数据框架,但我将使用polar进行转换部分,因为它的整体实用性,如速度和干净的API:
“example.intermediate_py_model.py”:
import typing as t
from datetime import datetime
from sqlmesh import ExecutionContext, model
import pandas as pd
import polars as pl
@model(
name="example.intermediate_py_model",
owner="Yuki",
kind="FULL",
cron="@daily",
grain="id",
columns={
"id": "int",
"letter": "text",
"value": "int",
"big_value": "int",
"updated_date": "date",
"new_col": "text",
},
column_descriptions={
"id": "primary key",
"letter": "alphabet letter",
"value": "random value",
"big_value": "value * 10",
"updated_date": "updated date",
"new_col": "a new column",
},
)
def execute(
context: ExecutionContext,
start: datetime,
end: datetime,
execution_time: datetime,
**kwargs: t.Any,
) -> pd.DataFrame:
table = context.resolve_table("example.base_model")
df = (
pl.from_pandas(context.fetchdf(f"SELECT * FROM {table}"))
.select(
"id",
"letter",
"value",
pl.col("value").mul(10).alias("big_value"),
"updated_date",
pl.lit("new_col").alias("new_col"),
)
)
return df.to_pandas()
注意事项:
- 整体结构是相同的,包括模型属性。
- Python模型要求您返回pandas或Spark数据框架。
- Python模型需要指定列模式。
- 使用‘ ExecutionContext ’是Python模型中的一种典型方法。它提供了对上游表、全局变量等的访问。
- 我没有在这个Python模型中使用宏函数来计算big_value列。原因是我们在Python模型中引用自定义Python宏的方式有点麻烦(在撰写本文时)。如果你愿意,你可以这样做:
-
导入宏函数就像在Python模型中导入Python函数一样。
-
在‘ MacroEvaluator ’类中导入,并将其作为参数传递给宏函数。
- 或者你将你的函数定义为一个通用的Python函数,没有‘ @macro ’装饰器和‘ evaluator ’参数,这样你就不需要导入和传入‘ MacroEvaluator ’类(Tobiko Slack线程)。
SQLMesh中的Python模型非常灵活,因为只要它们返回pandas或Spark数据框架,你就可以做几乎任何事情。如果愿意,你甚至可以在SQLMesh中将数据摄取作业构建为Python模型。
最后总结
显然,本文无法涵盖SQLMesh所提供的所有内容。未来我们继续学习下面列出的相关内容:
- 深入了解模型类型/种类
- Pre/post语句
- 有用的CLI命令(table_dff, sqlmesh evaluate等)
- 开源Github Actions CI/CD Bot
- 等
SQLMesh是一个令人兴奋且不断发展的工具。我将继续与大家分享我的见解。如果有任何关于SQLMesh或其他工具的具体内容,请随时告诉我。