触发器实现海豚调度失败企业微信自动告警

原理

触发器监控工作流实例表,当工作流实例表中的状态更新后,针对状态为失败的任务进行企业微信告警。

发送企业微信消息函数

# 必须在pg的主机上线安装requests模块
pip install requests
# 以postgres用户登陆psql客户端到etl数据库
psql etl -U postgres
# 创建插件plpython3u
create extension plpython3u;
# plpython3u为不受信语言,所以只能被超级用户使用
# 在tool模式下建立发送企业微信消息函数tool.sp_send_wechat
CREATE OR REPLACE FUNCTION tool.sp_send_wechat(message json, webhook character varying DEFAULT 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你自己的key'::character varying)
 RETURNS text
 LANGUAGE plpython3u
 SECURITY DEFINER
AS $function$
import requests
import json
"""
/*
 * 作者 : v-yuzhenc
 * 功能 : 给企业微信发送一条消息
 * message : 需要发送的消息,json格式
 * webhook : 企业微信机器人的webhook
 * */
"""
import requests
import json

# 企业微信自定义机器人的webhook地址
p_webhook = webhook
# 要发送的消息内容
p_message = json.loads(message)
# 发送POST请求
response = requests.post(p_webhook, data=json.dumps(p_message), headers={"Content-Type": "application/json"})

# 打印响应结果
return response.text
$function$
;
--将函数直接转给tool
ALTER FUNCTION tool.sp_send_wechat(json, varchar) OWNER TO tool;
--公开函数的执行权限
GRANT ALL ON FUNCTION tool.sp_send_wechat(json, varchar) TO public;
--将函数的执行权限授权给tool用户
GRANT ALL ON FUNCTION tool.sp_send_wechat(json, varchar) TO tool;
\q

企业微信告警触发器

  • 由于企业微信markdown格式的消息艾特指定的人只能通过企业微信中的userid(即用户在企业微信中的账号)调用,所以,我们在海豚调度的元数据表t_ds_user中增加wechat_userid字段,人工将海豚的用户对应的企业微信的userid维护上去
# 以dp用户登录etl数据库
psql etl -U dp
# 增加字段
alter table t_ds_user add wechat_userid varchar(100);
comment on column t_ds_user.wechat_userid is '对应的企业微信的userid';
# 维护wechat_userid中的数据
# 这里根据自己的企业实际情况做
update t_ds_user 
set wechat_userid = 'YuZhenChao'
where user_name = 'yuzhenchao'
;
# 创建触发器函数dp.tg_ds_udef_alert_wechat
CREATE OR REPLACE FUNCTION dp.tg_ds_udef_alert_wechat()
 RETURNS trigger
 LANGUAGE plpgsql
AS $function$
/*
 * 作者:v-yuzhenc
 * 功能:海豚调度工作流失败自动告警
 * */
declare
    i record;
    v_mobile varchar;
    v_content text;
    v_message varchar;
begin
    if new.state in (4,5,6) then 
        for i in (
            select
			     '<@'||d.wechat_userid||'>\r\n# [DolphinScheduler Job ]\r\n> 实例  id  :  ['||a.id::varchar||'/'||b.id||'](https://dolphin.tclpv.com/dolphinscheduler/ui/projects/'||g.code||'/workflow/instances/'||a.id||'?code='||a.process_definition_code||')\r\n> 项目名称 : <font color=\"comment\">'||g.name||'('||g.code||')</font>'||'\r\n> 工作流名 : <font color=\"comment\">'||e.name||'('||a.process_definition_code||')</font>'||'\r\n> 任务名称 : <font color=\"comment\">'||b.name||'('||b.task_code||')</font>'||'\r\n> 任务类型 : <font color=\"comment\">'||b.task_type||'</font>\r\n> 开始时间 : <font color=\"comment\">'||to_char(b.start_time,'yyyy-mm-dd hh24:mi:ss')||'</font>\r\n> 结束时间 : <font color=\"comment\">'||to_char(b.end_time,'yyyy-mm-dd hh24:mi:ss')||'</font>\r\n> 任务状态 : <font color=\"warning\">执行失败</font>'||'\r\n> 所属用户 : <font color=\"comment\">'||d.user_name||'('||c.user_id||')</font>' as wechat_content
			    ,d.phone
			from t_ds_process_instance a 
			inner join t_ds_task_instance b 
			on (a.id = b.process_instance_id)
			inner join t_ds_task_definition c 
			on (b.task_code = c.code and b.task_definition_version = c."version")
			inner join t_ds_user d 
			on (c.user_id = d.id)
			inner join t_ds_process_definition e 
			on (a.process_definition_code = e.code and a.process_definition_version = e."version")
			inner join t_ds_project g 
			on (e.project_code = g.code)
			where c.task_type <> 'SUB_PROCESS'
			    and a.state = 6
			    and b.state = 6
			    and a.id = new.id
        ) loop 
            v_mobile := i.phone;
            v_content := i.wechat_content;
            v_message := $v_message${
    "msgtype":"markdown",
    "markdown": {
        "content":"$v_message$||v_content||$v_message$"
    }
}$v_message$;
            --告警
            perform tool.sp_send_wechat(v_message::json);
        end loop;
    end if;
    return new;
end;
$function$
;
--授权给dp
ALTER FUNCTION dp.tg_ds_udef_alert_wechat() OWNER TO dp;
GRANT ALL ON FUNCTION dp.tg_ds_udef_alert_wechat() TO dp;
# 创建时候触发器
create trigger tg_state_ds_process_instance after update on dp.t_ds_process_instance for each row execute function dp.tg_ds_udef_alert_wechat();
\q

测试

  • 新建一个工作流,选择SQL组件

在这里插入图片描述

  • 保存工作流
    在这里插入图片描述

  • 上线工作流并运行工作流
    在这里插入图片描述

  • 工作流运行失败
    在这里插入图片描述

  • 随即企业微信来了消息提醒
    在这里插入图片描述
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/55235.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Wi-Fi 6技术详解

1. 介绍 Wi-Fi 6&#xff0c;也称为802.11ax&#xff0c;是Wi-Fi技术的最新标准。它是对之前标准Wi-Fi 5&#xff08;802.11ac&#xff09;的升级和改进&#xff0c;旨在提供更高的速度、更大的容量、更好的性能和更高的可靠性。Wi-Fi 6技术的引入为无线网络带来了革命性的变化…

建模教程:如何利用3ds Max 和 After Effects 实现多通道渲染和后期合成

推荐&#xff1a; NSDT场景编辑器助你快速搭建可二次开发的3D应用场景 1. 创建基本场景 步骤 1 打开 3ds Max。在 透视视口。 打开 3ds Max 步骤 2 做一个茶壶&#xff0c;放在飞机上。 制作茶壶 步骤 3 我在场景中应用了几个灯光。我选择了光线追踪阴影作为阴影。 光线追…

K8s安全配置:CIS基准与kube-bench工具

01、概述 K8s集群往往会因为配置不当导致存在入侵风险&#xff0c;如K8S组件的未授权访问、容器逃逸和横向攻击等。为了保护K8s集群的安全&#xff0c;我们必须仔细检查安全配置。 CIS Kubernetes基准提供了集群安全配置的最佳实践&#xff0c;主要聚焦在两个方面&#xff1a;主…

hadoop与HDFS交互

一、利用Shell命令与HDFS进行交互 在进行HDFS编程实践前&#xff0c;需要首先启动Hadoop。可以执行如下命令启动Hadoop&#xff1a; cd /usr/local/hadoop ./sbin/start-dfs.sh #启动hadoop Hadoop支持很多Shell命令&#xff0c;其中fs是HDFS最常用的命令&#xff0c;利用fs…

安全基础 --- 编码(02)+ form表单实现交互

浏览器解析机制和XSS向量编码 <!-- javascript伪协议不能被urlcode编码&#xff0c;但可以被html实体编码:也是js协议的一部分&#xff0c;不能被编码js协议被解码后&#xff0c;URL解析器继续解析链接剩下的部分unicode编码可识别实现解码但符号不能被编码&#xff0c;编码…

MPDIoU: A Loss for Efficient and Accurate Bounding BoxRegression--论文学习笔记

超越GIoU/DIoU/CIoU/EIoU MPDIoU让YOLOv7和YOLACT双双涨点 目标检测上的指标对比&#xff1a; 论文地址&#xff1a; [2307.07662] MPDIoU: A Loss for Efficient and Accurate Bounding Box Regression (arxiv.org) 摘要 边界框回归&#xff08;Bounding Box Regression&am…

随机森林构造有哪些步骤?随机森林构造案例

在机器学习中&#xff0c;随机森林是一个包含多个决策树的分类器&#xff0c;并且其输出的类别是由个别树输出的类别的众数而定。 随机森林 Bagging 决策树 例如, 如果你训练了5个树, 其中有4个树的结果是True, 1个树的结果是False, 那么最终投票结果就是True随机森林够造过…

【Docker】Docker应用部署之Docekr容器安装Nginx

目录 一、搜索镜像 二、拉取镜像 三、创建容器 四、测试使用 一、搜索镜像 docker search nginx 二、拉取镜像 docker pull nginx # 不加冒号版本号 默认拉取最新版 三、创建容器 首先我们需要在宿主机创建数据卷目录 mkdir nginx # 创建目录 cd nginx # 进入目录 mkd…

JAVA开发工具-maven的安装与配置(最新最详细教程)

引言 Maven项目对象模型(POM)&#xff0c;可以通过一小段描述信息来管理项目的构建&#xff0c;报告和文档的项目管理工具 软件。 Maven 除了以程序构建能力为特色之外&#xff0c;还提供高级项目管理工具。由于 Maven 的缺省构建规则有较 高的可重用性&#xff0c;所以常常用两…

【已解决】标签死活不响应单击事件

大家好&#xff0c;我是执念斩长河。今天在公司写代码的时候突然遇到一个问题&#xff0c;这个问题困扰了我不久&#xff0c;就是html中li标签不能响应我的单击事件。最后在仔细分析下&#xff0c;解决了这个问题。 文章目录 1、问题来源2、问题解决方案3、问题解决效果4、总结…

如何安装、部署、启动Jenkins

一、测试环境 Linux系统 Centos 7 二、安装步骤&#xff1a; 1、安装jdk 我安装的是jdk8&#xff0c;此处就不多说了&#xff0c;自己百度哈&#xff0c;很简单 2、安装jenkins 首先依次执行如下三个命令&#xff1a; 2.1、导入镜像&#xff1a; [rootcentos7 ~]# sudo …

下载JMeter的历史版本——个人推荐5.2.1版本

官网地址&#xff1a;https://archive.apache.org/dist/jmeter/binaries/

记一次centos 磁盘挂载过程

前言 最近买了云服务器磁盘&#xff0c;需要挂载&#xff0c;一下就由大猿来记录这次过程。 挂载过程 查看磁盘挂载情况 查看物理硬盘 lsblkfdisk -l标记分区 fdisk /dev/vdb格式化分区 xfs mkfs.xfs /dev/vdb mkfs.xfs -f /dev/vdbext4 mkfs.ext4 /dev/vdbxfs 和 ex…

基于opencv的几种图像滤波

一、介绍 盒式滤波、均值滤波、高斯滤波、中值滤波、双边滤波、导向滤波。 boxFilter() blur() GaussianBlur() medianBlur() bilateralFilter() 二、代码 #include <opencv2/core/core.hpp> #include <opencv2/highgui/highgui.hpp> …

Kafka系列之:记录一次Kafka Topic分区扩容,但是下游flink消费者没有自动消费新的分区的解决方法

Kafka系列之:记录一次Kafka Topic分区扩容,但是下游flink消费者没有自动消费新的分区的解决方法 一、背景二、解决方法三、实现自动发现新的分区一、背景 生产环境Kafka集群压力大,Topic读写压力大,消费的lag比较大,因此通过扩容Topic的分区,增大Topic的读写性能理论上下…

IDEA开启并配置services窗口

前言&#xff1a; 一般一个spring cloud项目中大大小小存在几个十几个module编写具体的微服务项目。此时&#xff0c;如果要调试测需要依次启动各个项目比较麻烦。 方法一&#xff1a; 默认第一次打开项目的时候&#xff0c;idea会提示是否增加这个选项卡&#xff0c;如果你没…

【ArcGIS Pro二次开发】(55):给多个要素或表批量添加字段

在工作中可能会遇到这样的场景&#xff1a;有多个GDB要素、表格&#xff0c;或者是SHP文件&#xff0c;需要给这个要素或表添加相同的多个字段。 在这种情况下&#xff0c;手动添加就变得很繁琐&#xff0c;于是就做了这个工具。 需求具体如下图&#xff1a; 左图是待处理数据…

【iOS】多线程 锁问题总结

文章目录 前言1. 你理解的多线程优点缺点 2. atomic 和 nonatomic 的区别及其作用3. GCD的队列类型 - 三种队列类型4. GCD的死锁问题5. 多线程之间的区别和联系6. 进程和线程&#xff1f;进程间的通信方式线程间的通信方式 6. iOS的线程安全手段如何保证 前言 iOS 锁和多线程的…

HCIP——BGP基础

BGP 一、BGP基础二、BGP的发展历史三、BGP在企业中的应用四、距离矢量型协议和路径矢量型协议五、BGP的特征六、BGP的对等关系七、BGP的数据包八、BGP的状态机九、BGP的工作过程十、BGP的路由黑洞十一、BGP的环路问题EBGP水平分割IBGP水平分割 十二、BGP的基本配置1、BGP的对等…