钉钉消息异常通知
支持多线程
支持多进程
支持@多人
支持@所有人
from pprint import pprint
import requests
import json
import threading
import multiprocessing
from datetime import datetime
class DingTalkAlert:
def __init__(self, webhook_url, secret=None):
self.webhook_url = webhook_url
self.secret = secret
def send_message(self, title, error_message, **kwargs):
at_all = kwargs.get('at_all', False)
at_mobiles = kwargs.get('at_mobiles', [])
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
message_content = f"**标题**: {title}\n\n**异常信息**: {error_message}\n\n**发生时间**: {timestamp}"
# headers = {
# 'Content-Type': 'application/json'
# }
data = {
"msgtype": "text",
"text": {
"title": title,
"content": message_content
},
"at": {
"isAtAll": at_all,
"atUserIds": [],
"atMobiles": at_mobiles
}
}
pprint(data)
response = requests.post(self.webhook_url, data=json.dumps(data), headers={'Content-Type': 'application/json'})
print(response.status_code)
return response.json()
def send_message_thread(self, title, error_message, **kwargs):
thread = threading.Thread(target=self.send_message, args=(title, error_message), kwargs=kwargs)
thread.start()
thread.join()
def send_message_process(self, title, error_message, **kwargs):
process = multiprocessing.Process(target=self.send_message, args=(title, error_message), kwargs=kwargs)
process.start()
process.join()
# 使用示例
if __name__ == "__main__":
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token='
alert = DingTalkAlert(webhook_url)
# 发送消息,@所有人
alert.send_message("告警通知", "这是一个告警消息,测试阶段,消息打扰,请多多包涵", at_all=True)
# 发送消息,@具体的多个负责人
alert.send_message("i告警通知", "这是一个告警消息,测试阶段,消息打扰,请多多包涵", at_mobiles=["1881xxxx363", ])
# 多线程发送消息,@所有人
alert.send_message_thread("告警通知", "这是一个多线程告警消息,测试阶段,消息打扰,请多多包涵", at_all=True)
# 多进程发送消息,@具体的多个负责人
alert.send_message_process("i告警通知", "这是一个多进程告警消息,测试阶段,消息打扰,请多多包涵", at_mobiles=["1881xxxx363", "13900000000"])