Python Tips6 基于数据库和钉钉机器人的通知

说明

起因是我第一版quant程序的短信通知失效了。最初认为短信是比较即时且比较醒目的通知方式,现在看来完全不行。

列举三个主要问题:

  • 1 延时。在早先还能收到消息的时候,迟滞就很严重,几分钟都算短的。
  • 2 完全丢失。我手机没有做任何设置,但是没有消息了。反正华为和腾讯,总有一个是锅。
    在这里插入图片描述
  • 3 短信模板。由于短信的模板需要审批,所以内容缩减的不像样子。所以信息阅读起来很累。

结论:不再采用短信方式,而是使用机器、邮件的方式通知。

内容

以下主要的内容有两部分:

  • 1 通过ORM方式操作数据库。
  • 2 使用钉钉机器人

1 数据

数据存在Mongo中,形如

在这里插入图片描述
在每次生成交易订单时,都会发出一条短信。所以这几天miss掉的订单,每个都是5个点,甚至十几个点的利润,太可惜了。100个短信包也比不上这些损失啊。

需要在订单的买和卖时间发生变动时发送通知信息。

对于发送程序来说,可以每30秒执行一次查询,每次执行时基于「当前时间」来判断需要发送的消息:

  • 1 基础筛选。只查询3天的候选数据。
  • 2 筛选买单。当 is_buy 字段不为1时,说明该笔订单的买信号没有发送,进行发送。
  • 3 筛选卖单。当 is_sell字段为不为1时,处理卖单消息。

所以对于业务来说,Mongo真的是更方便的,当需要增加字段时完全不用管表结构。

最近因为要做一些数据库方面的简单培训,在梳理的过程中,我发现其实很多数据库的出现是因为其特定的业务场景塑造的,而且通常能适应A场景的数据库,在B场景就不太行。所以A数据库通常无法取代B数据库

以Mongo和Mysql为例,是否A能取代B的问题应该是研究最多的(更大的考虑是否能用NoSQL取代SQL),现在来看,结论是不能。

目前可以做一些简单结论:

  • 1 当数据维度经常需要变动时,用Mongo。所以与操作和控制相关的场景下,主要用Mongo。业务形态不固定的时候,也用Mongo。例如本次,最初我并没有设计is_buy字段,但是要更改时几乎是瞬时完成的,似乎一开始就这样。
  • 2 当数据需要连表时,用Mysql。特别是结构化数据,用Mysql来存是比较方便的,每个字段也预先做了限定,不会犯错。

以下也简单总结了一些数据库的特点/场景:

  • 1 Redis : 高速kv存取,可用于实时计数等。
  • 2 Mongo: 文档性数据库,可用于集成部分的数据,适合作为主库。
  • 3 Mysql: 表格型数据库,可用于结构化部分的存储。
  • 4 Postgres: 算是Mysql的平替吧,在功能上更完善一些,许可更open。
  • 5 ES: 文档性数据库,主要用于解决模糊搜索效率的问题,用于搜索、推荐。
  • 6 Milvus: 向量数据库,主要解决大量中间向量的存储,以及相似性检索,用于搜图等功能。
  • 7 ClickHouse: 列式数据库。用于高效存储和统计,某种程度上也适合备份。速度太快,压缩比太高了。
  • 8 InfluxDB: 时序数据库。快速存储和分析时间数据。可用于物联网、金融等。
  • 9 Neo4j: 图库。用于存储真正的关联信息,适合用于知识图谱。
  • 10 SQLite:容易被忽略,但是这个还是非常棒的数据库。适合微型应用,嵌入式应用等。

还有很多数据库,对应着一些其他维度的细分。比如FAISS也是向量数据库,但是是在内存里;而Milvus是在硬盘上。然后还有很多是功能近似的,就不展开了。

2 ORM

之前我用pymongo自己搭了微服务,然后基于这个微服务又做了WMongo对象封装,本质上还是函数式的。结论是,函数式也好,agent(微服务)也好,的确是有其存在的必要和应用场景的。不过这种场景更偏向后端纯粹的数据处理与分析,在搭建应用对象(强调集成性和灵活性)时的确不方便。WMongo如果之后继续和pydantic结合,我应该也会做出一个类似工具。

言归正传, 操作Mongo时pymongo和mongoengine的关系就类似与pymysql和sqlalchemy;而Motor就像 aiomysql + sqlalchemy。

先从同步入手:

2.1 连接

from mongoengine import connect, disconnect,Document, StringField, IntField,DictField

# 可以直接使用函数式连接
connect(
    db='your_database_name',       # 数据库名称
    host='localhost',              # 主机地址,默认为localhost
    port=27017,                    # 端口号,默认为27017
    username='your_username',      # 可选,用户名
    password='your_password',      # 可选,密码
    authentication_source='admin'  # 可选,认证数据库,默认为admin
)
# 也可以使用url方式连接
# 集群
connect(
    host='mongodb://USER:PASSWD@IP1:PORT1,IP2:PORT2,IP3:PORT3,IP4:PORT4/%s?authSource=admin&replicaSet=mymeta' % db_name
)

本次仅按单机方式连接

# 单机模式
host_url = 'mongodb://USER:PASSWD@IP1:PORT1/%s?authSource=admin' % db_name
client = connect(host=host_url)
# 获取所有数据库
databases = client.list_database_names()
print("Databases:")
for db in databases:
    print(db)

# 遍历每个数据库,获取集合
for db_name in databases:
    db = client[db_name]
    collections = db.list_collection_names()
    print(f"\nCollections in {db_name}:")
    for collection in collections:
        print(collection)

# 断开连接
disconnect()

连接之后,做了一些函数式的操作,这个虽然不是ORM的主要方向,但有时候还是有点用的。接下来则是对数据的操作:

设置对象时,如果meta没有设置,对应的表名将会按类名的小写拆开建立并严格校验字段的一致性。这里我并不会映射现有表的全部字段,需要设置非严格模式。

    meta = {
        'collection': 'trade_orders',  # MongoDB 中的集合名
        'strict': False,  # 不强制字段验证,允许动态字段
    }

2.2 查询

我发现关于mongoengine的内容,大模型说的很多是错误的。
这是一个还不错的介绍。一文带你深入浅出 MongoEngine 经典查询【内附详细案例】

因为之前用pymongo做了很多开发,所以知道字符串是可以比较的。程序里的这个报错TypeError: '>=' not supported between instances of 'StringField' and 'str'是指>=只能用于数值型的筛选。而mongo里的gte方法则是通过变量名称的特殊构造来传递的(x__gte), 虽然有点妖,但的确是个好点子。

另外如果返回后会进行批量处理(而不是使用单个实例),那么可以在定义映射模型时定义返回字典。


# 定义与 MongoDB 集合关联的类
class TradeOrders(Document):
    # 指定 MongoDB 中的集合名称
    # 启用动态模式,这样可以处理未定义的字段
    meta = {
        'collection': 'trade_orders',  # MongoDB 中的集合名
        'strict': False,  # 不强制字段验证,允许动态字段
        'indexes': [
            {'fields': ['is_buy_sms_tag']},
            {'fields': ['is_sell_sms_tag']}
        ]

    }
    
    # 定义字段
    model_signal = StringField(required=True, max_length=50)
    reason = StringField()
    np = FloatField()
    is_win = IntField()
    buy_price = FloatField()
    sell_price = FloatField()
    buy_dt = StringField()
    sell_dt = StringField()
    buy_amt = FloatField()

    strategy_name = StringField()
    is_buy_sms_tag = IntField()
    is_sell_sms_tag = IntField()

    def dict(self):
        res_dict = {}
        res_dict['model_signal'] = self.model_signal
        res_dict['reason'] = self.reason
        res_dict['np'] = self.np
        res_dict['is_win'] = self.is_win
        res_dict['buy_price'] = self.buy_price
        res_dict['sell_price'] = self.sell_price
        res_dict['buy_dt'] = self.buy_dt
        res_dict['buy_amt'] = self.buy_amt

        res_dict['strategy_name'] = self.strategy_name
        res_dict['is_buy_sms_tag'] = self.is_buy_sms_tag
        res_dict['is_sell_sms_tag'] = self.is_sell_sms_tag
        return res_dict



# 实例化时会为现有集合创建索引(是ensure index的方式)
from datetime import datetime, timedelta
# 获取当前时间
now = datetime.now()

# 计算7天前的时间
seven_days_ago = now - timedelta(days=7)

# 将 seven_days_ago 转换为字符串格式,以便与 buy_dt 和 sell_dt 进行比较
seven_days_ago_str = seven_days_ago.strftime('%Y-%m-%d %H:%M:%S')

# 查询 buy_dt 或 sell_dt 在7天之内的订单
recent_orders = TradeOrders.objects(buy_dt__gt=seven_days_ago_str).all()
recent_orders1 = [x.dict() for x in recent_orders]

 {'model_signal': '510500_normal_strong_2_near_120_far_1200_top_85_standard_gbdt_running_B',
  'reason': 'Control Sell',
  'np': 392.1,
  'is_win': 1,
  'buy_price': 5.533,
  'sell_price': 5.999,
  'buy_dt': '2024-09-30 09:54:00',
  'buy_amt': 4980.0,
  'strategy_name': '510500_normal_strong_2_near_120_far_1200_top_85_standard_gbdt_running_B_15_15',
  'is_buy_sms_tag': None,
  'is_sell_sms_tag': None},
 {'model_signal': None,
  'reason': None,
  'np': None,
  'is_win': None,
  'buy_price': 4.162,
  'sell_price': None,
  'buy_dt': '2024-09-30 13:43:00',
  'buy_amt': 4994.0,
  'strategy_name': '510300_normal_strong_2_near_60_far_1800_top_80_all_notnull_gbdt_running_A_10_15',
  'is_buy_sms_tag': None,
  'is_sell_sms_tag': None}]

3 钉钉

以下是发送函数。webhook url需要在钉钉群里创建机器人得到。

import requests
import json
import time
import hmac
import hashlib
import base64
import urllib.parse

# 钉钉 Webhook 和加签密钥

# mine
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=xxxx'
secret ='xxx'


# 生成签名
def generate_sign(secret):
    timestamp = str(round(time.time() * 1000))
    secret_enc = secret.encode('utf-8')
    string_to_sign = '{}\n{}'.format(timestamp, secret)
    string_to_sign_enc = string_to_sign.encode('utf-8')
    hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
    sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
    return timestamp, sign

# 发送信息到群里面
def send_dingtalk_message_markdown(webhook, secret, title = None, message = None):
    try:
        timestamp, sign = generate_sign(secret)
        url = f"{webhook}&timestamp={timestamp}&sign={sign}"
        headers = {'Content-Type': 'application/json'}

        data = {
            "msgtype": 'markdown',
            "markdown": {
                'title':title,
                "text": message
            }
        } 
        print(f"Sending message to URL: {url}")
        print(f"Message content: {data}")
        response = requests.post(url, headers=headers, data=json.dumps(data))
        print(f"Response: {response.status_code}, {response.text}")
        return response.status_code, response.text
    except Exception as e:
        print(f"Error sending message: {e}")
        return None, str(e)

send_dingtalk_message_markdown('https://oapi.dingtalk.com/robot/send?access_token=xxx',
'xxx',title ='It is Title', message= '# It is Content\n## AA' )

在这里插入图片描述
效果比短信好多了。

4 整合逻辑

按照买消息和卖消息两个维度进行数据的整理和通知。对于买消息而言,消息的内容更少,而对于卖消息,需要展示的内容更多。在发布完消息后要把对应的标记打上,避免再次被筛选到。

作为Buy,提取以下字段发布

  • 1 Code
  • 2 Buy Datetime
  • 3 Buy Price
  • 4 Buy Amout
  • 5 Strategy Name

发布消息之后进行ACK(也就是把对应的字段置为1),所以,某种程度上说使用消息队列可能更自然一些。

# 先获取未消息的买单数据
from mongoengine.queryset.visitor import Q
# 查询 buy_dt 或 sell_dt 在7天之内的订单
recent_orders = TradeOrders.objects(
    Q(buy_dt__gt=seven_days_ago_str) & Q(is_buy_sms_tag__ne = 1)
    
    ).all()
recent_orders1 = [x.dict() for x in recent_orders]

# 对任何一个买单
some_buy_order = recent_orders[0]
code = some_buy_order.strategy_name.split('_')[0]
msg_title = '### Buy %s' % code 
msg_text = '''%s \n### Buy DT:%s\n### Buy Price:%s\n### Buy Vol:%s\n### Buy Amt:%s\nstrategy:%s''' %(msg_title,some_buy_order.buy_dt, some_buy_order.buy_price,some_buy_order.buy_vol,some_buy_order.buy_amt ,some_buy_order.strategy_name)

send_dingtalk_message_markdown('https://oapi.dingtalk.com/robot/send?access_token=xxx',
'xxx',title =msg_title, message= msg_text )

在这里插入图片描述
发送消息后,对相应的买单执行ACK

# ack
some_buy_order.is_buy_sms_tag = 1
some_buy_order.save()

循环执行买消息


from mongoengine.queryset.visitor import Q
# 查询 buy_dt 或 sell_dt 在7天之内的订单
recent_orders = TradeOrders.objects(
    Q(buy_dt__gt=seven_days_ago_str) & Q(is_buy_sms_tag__ne = 1)
    ).all()
# buy / one
import time 
if len(recent_orders):
    print('共有%s个买消息' % len(recent_orders))
    for some_buy_order in recent_orders:
        # some_buy_order = recent_orders[0]
        code = some_buy_order.strategy_name.split('_')[0]
        msg_title = '### Buy %s' % code 
        msg_text = '''%s \n### Buy DT:%s\n### Buy Price:%s\n### Buy Vol:%s\n### Buy Amt:%s\nstrategy:%s''' %(msg_title,some_buy_order.buy_dt, some_buy_order.buy_price,some_buy_order.buy_vol,some_buy_order.buy_amt ,some_buy_order.strategy_name)
        # msg 
        send_dingtalk_message_markdown(webhook_url,secret,title =msg_title, message= msg_text )
        # ack
        some_buy_order.is_buy_sms_tag = 1
        some_buy_order.save()
        # 防止消息发的太快
        time.sleep(1)

然后每次执行时都执行一次查询,如果有买单,按顺序循环输出即可。
在这里插入图片描述
同样的,对于卖单

  • 1 查询未发送的卖单
  • 2 与买单相比,卖单会输出更多的字段
if len(recent_sell_orders):
    print('共有%s个卖消息' % len(recent_sell_orders))
    for some_sell_order in recent_sell_orders:

        # some_sell_order = recent_sell_orders[0]

        code = some_sell_order.strategy_name.split('_')[0]
        msg_title = '### Sell %s' % code 
        msg_text = '''%s \n### Buy DT:%s\n### Buy Price:%s\n### Buy Vol:%s\n### Buy Amt:%s\nstrategy:%s''' %(msg_title,some_sell_order.buy_dt, some_sell_order.buy_price,some_sell_order.buy_vol,some_sell_order.buy_amt ,some_sell_order.strategy_name)
        msg_sell_text = '''\n### Sell DT:%s\n### Sell Price:%s\n### Sell Vol:%s\n ### Sell Amt:%s\n### NetProfit: %s\n### Reason:%s ''' % (some_sell_order.sell_dt, some_sell_order.sell_price, some_sell_order.sell_vol, some_sell_order.sell_amt, some_sell_order.np,some_sell_order.reason)
        send_dingtalk_message_markdown(webhook_url,secret,title =msg_title, message= msg_text+ msg_sell_text)

        # ack
        some_sell_order.is_sell_sms_tag = 1
        some_sell_order.save()
        # 防止消息发的太快
        time.sleep(1)

效果类似
在这里插入图片描述

5 APS运行

ubuntu系统自带的cron并不好用,所以我用python的apscheduler。只是在系统重启的时候需要自己管理,有两个办法可以解决这个问题:

  • 1 注册systemd服务,这样开机时服务可以跟随启动
  • 2 使用docker运行。

不过我给机器配了ups,一般情况下也不会关机,这个问题等之后再考虑吧,暂时就用nohup后台运行。

  • 1 在/home/workers/ 下放py文件
  • 2 在/home/local_aps/ 下放脚本文件
  • 3 在/home/local_aps/ 下放主启动文件

aps.py 主程序启动,aps默认是采用多线程方式控制多个worker(所以应该也也不是完全阻塞的)

from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingScheduler

def exe_sh(fpathname = None):
    os.system('sh %s ' % fpathname)

# 后台启动命令 nohup python3 /root/prj27_timetask/cron_task/test_001.py >/dev/null 2>&1 &

if __name__ == '__main__':
    sche1 = BlockingScheduler()
    sche1.add_job(exe_sh,'interval', seconds=30, kwargs ={'fpathname':'/home/local_aps/aps01.sh'})
    # sche1.add_job(exe_sh,'interval', seconds=86400, kwargs ={'fpathname':'/home/shs/clean_log.sh'})
    print('[S] starting inteverl')
    sche1.start()

aps01.sh用于灵活存放多个需要执行的python脚本

#!/bin/bash
python3 /home/workers/qtv102_dingtalk.py

qtv102_dingtalk.py 加上了logger部分

import logging
from logging.handlers import RotatingFileHandler

def get_logger(name, lpath='/var/log/'):
    logger = logging.getLogger(name)
    fpath = lpath + name + '.log'
    handler = RotatingFileHandler(fpath, maxBytes=100 * 1024 * 1024, backupCount=10)

    # 设置日志格式为 [时间] - [日志级别] - 消息
    formatter = logging.Formatter('[%(asctime)s] - [%(levelname)s] - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
    handler.setFormatter(formatter)

    logger.addHandler(handler)
    logger.setLevel(logging.INFO)

    return logger

# 按日志格式写入数据
log_info_tmp = '[%s] - %s'
logger = get_logger('qtv102_dingtalk')


from mongoengine import connect, disconnect,Document, StringField, IntField,DictField,FloatField

# connect(
#     db='your_database_name',       # 数据库名称
#     host='localhost',              # 主机地址,默认为localhost
#     port=27017,                    # 端口号,默认为27017
#     username='your_username',      # 可选,用户名
#     password='your_password',      # 可选,密码
#     authentication_source='admin'  # 可选,认证数据库,默认为admin
# )

db_name = 'QTV102_Strategy'

# 单机模式
host_url = 'mongodb://xxx:xxx@192.168.0.159:24086/'
connect(db_name, host=host_url)


# 定义与 MongoDB 集合关联的类
class TradeOrders(Document):
    # 指定 MongoDB 中的集合名称
    # 启用动态模式,这样可以处理未定义的字段
    meta = {
        'collection': 'trade_orders',  # MongoDB 中的集合名
        'strict': False,  # 不强制字段验证,允许动态字段
        'indexes': [
            {'fields': ['is_buy_sms_tag']},
            {'fields': ['is_sell_sms_tag']}
        ]

    }

    # 定义字段
    # model_signal = StringField(required=True, max_length=50)
    reason = StringField()
    np = FloatField()
    is_win = IntField()
    buy_price = FloatField()
    sell_price = FloatField()
    buy_dt = StringField()
    sell_dt = StringField()
    buy_vol = IntField()
    sell_vol = IntField()
    buy_amt = FloatField()
    sell_amt = FloatField()

    strategy_name = StringField()
    is_buy_sms_tag = IntField()
    is_sell_sms_tag = IntField()

    def dict(self):
        res_dict = {}
        res_dict['reason'] = self.reason
        res_dict['np'] = self.np
        res_dict['is_win'] = self.is_win
        res_dict['buy_price'] = self.buy_price
        res_dict['sell_price'] = self.sell_price

        res_dict['buy_dt'] = self.buy_dt
        res_dict['buy_amt'] = self.buy_amt

        res_dict['sell_dt'] = self.sell_dt
        res_dict['sell_amt'] = self.sell_amt

        res_dict['buy_vol'] = self.buy_vol
        res_dict['sell_vol'] = self.sell_vol

        res_dict['strategy_name'] = self.strategy_name
        res_dict['is_buy_sms_tag'] = self.is_buy_sms_tag
        res_dict['is_sell_sms_tag'] = self.is_sell_sms_tag
        return res_dict



# 实例化时会为现有集合创建索引(是ensure index的方式)
from datetime import datetime, timedelta
# 获取当前时间
now = datetime.now()

# 计算7天前的时间
seven_days_ago = now - timedelta(days=7)

# 将 seven_days_ago 转换为字符串格式,以便与 buy_dt 和 sell_dt 进行比较
seven_days_ago_str = seven_days_ago.strftime('%Y-%m-%d %H:%M:%S')


# recent_orders1 = [x.dict() for x in recent_orders]

# 发送消息
# test_md = '''# 关于订单 \n## 1 订单 allls'''
# test_html = '''<h1>关于订单</h1> <p>allls</p>'''




# --- 发送消息
import requests
import json
import time
import hmac
import hashlib
import base64
import urllib.parse

# 钉钉 Webhook 和加签密钥

# mine
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=xxxx'
secret ='xxx'


# 生成签名
def generate_sign(secret):
    timestamp = str(round(time.time() * 1000))
    secret_enc = secret.encode('utf-8')
    string_to_sign = '{}\n{}'.format(timestamp, secret)
    string_to_sign_enc = string_to_sign.encode('utf-8')
    hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
    sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
    return timestamp, sign

# 发送信息到群里面
def send_dingtalk_message_markdown(webhook, secret, title = None, message = None):
    try:
        timestamp, sign = generate_sign(secret)
        url = f"{webhook}&timestamp={timestamp}&sign={sign}"
        headers = {'Content-Type': 'application/json'}

        data = {
            "msgtype": 'markdown',
            "markdown": {
                'title':title,
                "text": message
            }
        }
        print(f"Sending message to URL: {url}")
        print(f"Message content: {data}")
        response = requests.post(url, headers=headers, data=json.dumps(data))
        print(f"Response: {response.status_code}, {response.text}")
        return response.status_code, response.text
    except Exception as e:
        print(f"Error sending message: {e}")
        return None, str(e)



from mongoengine.queryset.visitor import Q

# 查询 buy_dt 或 sell_dt 在7天之内的订单
recent_orders = TradeOrders.objects(
    Q(buy_dt__gt=seven_days_ago_str) & Q(is_buy_sms_tag__ne = 1)
    ).all()
logger.info(log_info_tmp % ('get_open_orders', 'recs %s' % len(recent_orders)))
# buy / one
import time
if len(recent_orders):
    print('共有%s个买消息' % len(recent_orders))
    for some_buy_order in recent_orders:
        # some_buy_order = recent_orders[0]
        code = some_buy_order.strategy_name.split('_')[0]
        msg_title = '### Buy %s' % code
        msg_text = '''%s \n### Buy DT:%s\n### Buy Price:%s\n### Buy Vol:%s\n### Buy Amt:%s\nstrategy:%s''' %(msg_title,some_buy_order.buy_dt, some_buy_order.buy_price,some_buy_order.buy_vol,some_buy_order.buy_amt ,some_buy_order.strategy_name)
        # msg
        send_dingtalk_message_markdown(webhook_url,secret,title =msg_title, message= msg_text )
        # ack
        some_buy_order.is_buy_sms_tag = 1
        some_buy_order.save()
        # 防止消息发的太快
        time.sleep(1)

    logger.info(log_info_tmp % ('sent_open_orders', 'recs %s' % len(recent_orders)))

# 卖单
recent_sell_orders = TradeOrders.objects(
    Q(sell_dt__gt=seven_days_ago_str) & Q(is_sell_sms_tag__ne = 1)
    ).all()

logger.info(log_info_tmp % ('get_close_orders', 'recs %s' % len(recent_sell_orders)))
if len(recent_sell_orders):
    print('共有%s个卖消息' % len(recent_sell_orders))
    for some_sell_order in recent_sell_orders:

        # some_sell_order = recent_sell_orders[0]

        code = some_sell_order.strategy_name.split('_')[0]
        msg_title = '### Sell %s' % code
        msg_text = '''%s \n### Buy DT:%s\n### Buy Price:%s\n### Buy Vol:%s\n### Buy Amt:%s\nstrategy:%s''' %(msg_title,some_sell_order.buy_dt, some_sell_order.buy_price,some_sell_order.buy_vol,some_sell_order.buy_amt ,some_sell_order.strategy_name)
        msg_sell_text = '''\n### Sell DT:%s\n### Sell Price:%s\n### Sell Vol:%s\n ### Sell Amt:%s\n### NetProfit: %s\n### Reason:%s ''' % (some_sell_order.sell_dt, some_sell_order.sell_price, some_sell_order.sell_vol, some_sell_order.sell_amt, some_sell_order.np,some_sell_order.reason)
        send_dingtalk_message_markdown(webhook_url,secret,title =msg_title, message= msg_text+ msg_sell_text)

        # ack
        some_sell_order.is_sell_sms_tag = 1
        some_sell_order.save()
        # 防止消息发的太快
        time.sleep(1)

    logger.info(log_info_tmp % ('sent_close_orders', 'recs %s' % len(recent_sell_orders)))

观察日志

....
[2024-10-02 17:28:17] - [INFO] - [get_close_orders] - recs 0
[2024-10-02 17:28:47] - [INFO] - [get_open_orders] - recs 0
[2024-10-02 17:28:47] - [INFO] - [get_close_orders] - recs 0
[2024-10-02 17:29:17] - [INFO] - [get_open_orders] - recs 0
[2024-10-02 17:29:17] - [INFO] - [get_close_orders] - recs 0
[2024-10-02 17:29:47] - [INFO] - [get_open_orders] - recs 0
[2024-10-02 17:29:47] - [INFO] - [get_close_orders] - recs 0
[2024-10-02 17:30:17] - [INFO] - [get_open_orders] - recs 0
[2024-10-02 17:30:17] - [INFO] - [get_close_orders] - recs 0
[2024-10-02 17:30:47] - [INFO] - [get_open_orders] - recs 0
[2024-10-02 17:30:47] - [INFO] - [get_close_orders] - recs 0
...

修改数据的消息发送字段,然后就看到重发消息了
在这里插入图片描述

到这里,基本ok了。之前的sms可以扔掉了。

总结一下:

  • 1 消息存放在数据库里。
  • 2 程序通过ORM定期查询需要发送的消息。
  • 3 发送钉钉消息后将属性更新,回存数据库

以后的策略消息仍然会先存放在数据库中,不过是通过ORM访问数据库还是走消息队列,以及发送到钉钉还是邮件还是其他可以再探讨。

ps: 属性的更新应该只是部分更新(update),而不是替换(replace),这样开销比较小。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/886242.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Mac 电脑配置yolov8运行环境实现目标追踪、计数、画出轨迹、多线程

&#x1f947; 版权: 本文由【墨理学AI】原创首发、各位读者大大、敬请查阅、感谢三连 &#x1f389; 声明: 作为全网 AI 领域 干货最多的博主之一&#xff0c;❤️ 不负光阴不负卿 ❤️ 文章目录 &#x1f4d9; Mac 电脑 配置 yolov8 环境&#x1f4d9; 代码运行推理测试模型训…

计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-09-27

计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-09-27 目录 文章目录 计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-09-27目录1. VisScience: An Extensive Benchmark for Evaluating K12 Educational Multi-modal Scientific Reasoning VisScience:…

kubeadm部署k8s集群,版本1.23.6;并设置calico网络BGP模式通信,版本v3.25--未完待续

1.集群环境创建 三台虚拟机&#xff0c;一台master节点&#xff0c;两台node节点 (根据官网我们知道k8s 1.24版本之后就需要额外地安装cri-dockerd作为桥接才能使用Docker Egine。经过尝试1.24后的版本麻烦事很多&#xff0c;所以此处我们选择1.23.6版本) 虚拟机环境创建参考…

Webstorm 中对 Node.js 后端项目进行断点调试

首先&#xff0c;肯定需要有一个启动服务器的命令脚本。 然后&#xff0c;写一个 debug 的配置&#xff1a; 然后&#xff0c;debug 模式 启动项目和 启动调试服务&#xff1a; 最后&#xff0c;发送请求&#xff0c;即可调试&#xff1a; 这几个关键按钮含义&#xff1a; 重启…

Geoserver关于忘记密码的解决方法

第一次安装后&#xff0c;如果你设置密码那一栏一直都是默认的话&#xff0c;那么登录密码应该是账户 admin&#xff0c;密码 geoserver 但是&#xff0c;如果你自己设置了密码和账户&#xff0c;登录又登录不上&#xff0c;或者忘记了&#xff0c;有以下方法可以解决。 本质…

CSS——文字闪烁效果

CSS——文字闪烁效果 今天来完成一个文字闪烁的动态效果&#xff0c;具体呈现效果如下&#xff1a; 文字闪烁动态效果 实现步骤 基础的样式 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"vi…

收单外包机构备案分析及建议

2020年9月16日&#xff0c;中国支付清算协会&#xff08;下称“中支协”或“协会”&#xff09;公示了首批收单外包服务机构备案名单。历经5年&#xff0c;约进行50次公示后&#xff0c;截至9月21日共备案收单外包机构32457家&#xff0c;取消备案机构316家&#xff0c;拟取消机…

8642 快速排序

### 思路 快速排序是一种分治算法&#xff0c;通过选择一个基准元素将数组分成两部分&#xff0c;然后递归地对每部分进行排序。每次分区后输出当前排序结果。 ### 伪代码 1. 读取输入的待排序关键字个数n。 2. 读取n个待排序关键字并存储在数组中。 3. 对数组进行快速排序&am…

【路径规划】基于球向量的粒子群优化(SPSO)算法在无人机路径规划中的实现

摘要 本文介绍了基于球形矢量的粒子群优化&#xff08;Spherical Particle Swarm Optimization, SPSO&#xff09;算法&#xff0c;用于无人机&#xff08;UAV&#xff09;路径规划。SPSO算法通过引入球形矢量的概念&#xff0c;增强了粒子群在多维空间中的探索和利用能力&…

安全中心 (SOC) 与 网络运营中心 (NOC)

NOC 和 SOC 之间的区别 网络运营中心 (NOC) 负责维护公司计算机系统的技术基础设施&#xff0c;而安全运营中心 (SOC) 则负责保护组织免受网络威胁。 NOC 专注于防止自然灾害、停电和互联网中断等自然原因造成的网络干扰&#xff0c;而 SOC 则从事监控、管理和保护。 NOC 提…

Junit和枚举ENUM

断言机制&#xff0c;JAVA中的断言机制是一种用于检查程序中某个条件是否为真的机制。它可以在程序运行时检查某个条件是否满足&#xff0c;如果不满足则会抛出AssertionError异常。 在java中,断言机制默认是关闭的。所以会输出u。 断言机制只是为了用来吃调试程序的&#xff0…

Electron 安装以及搭建一个工程

安装Node.js 在使用Electron进行开发之前&#xff0c;需要安装 Node.js。 官方建议使用最新的LTS版本。 检查 Node.js 是否正确安装&#xff1a; # 查看node版本 node -v # 查看npm版本 npm -v注意 开发者需要在开发环境安装 Node.js 才能编写 Electron 项目&#xff0c;但是…

C++中stack和queue的模拟实现

目录 1.容器适配器 1.1什么是适配器 1.2STL标准库中stack和queue的底层结构 1.3deque的简单介绍 1.3.1deque的原理介绍 1.3.2deque的优点和缺陷 1.3.3deque和vector进行排序的性能对比 1.4为什么选择deque作为stack和queue的底层默认容器 2.stack的介绍和模拟…

MybatisPlus代码生成器的使用

在使用MybatisPlus以后&#xff0c;基础的Mapper、Service、PO代码相对固定&#xff0c;重复编写也比较麻烦。因此MybatisPlus官方提供了代码生成器根据数据库表结构生成PO、Mapper、Service等相关代码。只不过代码生成器同样要编码使用&#xff0c;也很麻烦。 这里推荐大家使…

【ios】---swift开发从入门到放弃

swift开发从入门到放弃 环境swift入门变量与常量类型安全和类型推断print函数字符串整数双精度布尔运算符数组集合set字典区间元祖可选类型循环语句条件语句switch语句函数枚举类型闭包数组方法结构体 环境 1.在App Store下载Xcode 2.新建项目&#xff08;可以先使用这个&…

JSR303微服务校验

一.创建idea 二.向pom.xml添加依赖 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.7.RELEASE</version></parent><properties><java.vers…

SpringCloud-基于Docker和Docker-Compose的项目部署

一、初始化环境 1. 卸载旧版本 首先&#xff0c;卸载可能已存在的旧版本 Docker。如果您不确定是否安装过&#xff0c;可以直接执行以下命令&#xff1a; sudo yum remove docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logro…

数字化那点事:一文读懂数字孪生

一、数字孪生的定义 数字孪生&#xff08;Digital Twin&#xff09;是指通过数字技术构建的物理实体的虚拟模型&#xff0c;能够对该实体进行全方位、动态跟踪和仿真预测。简单来说&#xff0c;数字孪生就是在一个设备或系统的基础上创造一个数字版的“克隆体”&#xff0c;这…

【RADARSAT Constellation Mission(RCM)卫星星座简介】

RADARSAT Constellation Mission&#xff08;RCM&#xff09;卫星星座是加拿大太空局&#xff08;CSA&#xff09;的下一代C波段合成孔径雷达&#xff08;SAR&#xff09;卫星星座&#xff0c;以下是对其的详细介绍&#xff1a; 一、基本信息 发射时间&#xff1a;2019年6月…

在Linux系统安装Nginx

注意&#xff1a;Nginx端口号是80(云服务器要放行) 我的是基于yum源安装 安装yum源(下面这4步就好了) YUM源 1、将源文件备份 cd /etc/yum.repos.d/ && mkdir backup && mv *repo backup/ 2、下载阿里源文件 curl -o /etc/yum.repos.d/CentOS-Base.repo ht…