Python一些可能用的到的函数系列130 UCS-Time Brick

说明

UCS对象是基于GFGoLite进行封装,且侧重于实现UCS规范。

内容

1 函数

我发现pydantic真是一个特别好用的东西,可以确保在数据传递时的可靠,以及对某个数据模型的描述。

以下,UCS给出了id、time相关的brick映射,并给出了时间到字符的转换。其方便之处在于,本地几乎不需要管怎么实现的,只要有服务支持即可。效率也足够高。

from typing import List, Optional
from pydantic import BaseModel

import requests as req 
class UCS(BaseModel):
    __version__ =1.1
    gfgo_lite_server: str = 'http://172.17.0.1:24090/'

    def get_brick_name(self, some_id = None):
        some_dict = {}
        some_dict['rec_id'] = some_id
        url = self.gfgo_lite_server + 'get_brick_name/'
        res = req.post(url, json = some_dict).json()
        return res 

    def get_brick_name_s(self, some_id_list = None):
        some_dict = {}
        some_dict['rec_id_list'] = some_id_list
        url = self.gfgo_lite_server + 'get_brick_name_s/'
        res = req.post(url, json = some_dict).json()
        return res 


    def gfgo(self, kwargs = None, pack_func = None):
        url = self.gfgo_lite_server + 'gfgo/'
        return req.post(url, json = {'kwargs':kwargs,'pack_func':pack_func}).json()


    def dt_str2num(self, some_dt_str = None):
        some_dict = {}
        some_dict['some_dt_str'] = some_dt_str
        url = self.gfgo_lite_server + 'str2num/'
        res = req.post(url, json = some_dict).json()
        return  res

    def dt_str2num_s(self, some_dt_str_list = None):
        some_dict = {}
        some_dict['some_dt_str_list'] = some_dt_str_list
        url = self.gfgo_lite_server + 'str2num_s/'
        res = req.post(url, json = some_dict).json()
        return  res

    # 时间名称
    def get_time_brick_name(self, dt_str_or_ts = None):
        some_dict = {}
        some_dict['dt_str_or_ts'] = dt_str_or_ts
        url = self.gfgo_lite_server + 'get_time_brick_name/'
        res = req.post(url, json = some_dict).json()
        return res 

    def get_time_brick_name_s(self, dt_str_or_ts_list = None):
        some_dict = {}
        some_dict['dt_str_or_ts_list'] = dt_str_or_ts_list
        url = self.gfgo_lite_server + 'get_time_brick_name_s/'
        res = req.post(url, json = some_dict).json()
        return res 

    # 1.1 >>>
    def get_brick_list(self, start_brick_name = None, end_brick_name = None):
        some_dict = {}
        some_dict['start_brick_name'] = start_brick_name
        some_dict['end_brick_name'] = end_brick_name

        url = self.gfgo_lite_server + 'get_brick_list/'
        res = req.post(url, json = some_dict).json()
        return res 

    def get_brick_bounds(self, brick_name = None):
        some_dict = {}
        some_dict['brick_name'] = brick_name

        url = self.gfgo_lite_server + 'get_brick_bounds/'
        res = req.post(url, json = some_dict).json()
        return res 

    # 1.2 >>> 时间
    # get_time_brick_bounds_s - 待
    def get_time_brick_bounds_s(self, brick_name_list = None, char_or_num = 'char'):
        some_dict = {}
        some_dict['brick_name_list'] = brick_name_list
        some_dict['char_or_num'] = char_or_num

        url = self.gfgo_lite_server + 'get_time_brick_bounds_s/'
        res = req.post(url, json = some_dict).json()
        return res 

    # get_time_brick_list
    def get_time_brick_list(self,start_brick_name = None, end_brick_name = None):
        some_dict = {}
        some_dict['start_brick_name'] = start_brick_name
        some_dict['end_brick_name'] = end_brick_name

        url = self.gfgo_lite_server + 'get_time_brick_list/'
        res = req.post(url, json = some_dict).json()
        return res 

2 应用

2.1 时间转换

ucs = UCS(gfgo_lite_server = 'http://xxx:24090/')
ucs.dt_str2num('2024-01-31 11:11:11')
1706670671

ucs.dt_str2num_s(['2024-01-31 11:11:11','2024-02-01 11:11:11' ])
[1706670671, 1706757071]

反函数可以用get_time_str1

In [19]: get_time_str1(1706670671)
Out[19]: '2024-01-31 11:11:11'

2.2 UCS-time

一个简单的应用如下:我需要制定一个静态数据同步计划,计划需要随着时间轴,按照时间块熟顺序向前推进。具体来说,这个对于数据库迁移(合并)与回测都有关系。

tbrick1 = ucs.get_time_brick_name('2024-01-31 11:11:11')
'2024.01.31.11'
tbrick2 = ucs.get_time_brick_name('2024-02-10 11:11:11')
'2024.02.10.11'

tbrick_list = ucs.get_time_brick_list(start_brick_name = tbrick1, end_brick_name = tbrick2)
['2024.01.31.11', '2024.01.31.12', '2024.01.31.13', ...]

ucs.get_time_brick_bounds_s(tbrick_list[:3])
[['2024-01-31 11:00:00', '2024-01-31 12:00:00'],
 ['2024-01-31 12:00:00', '2024-01-31 13:00:00'],
 ['2024-01-31 13:00:00', '2024-01-31 14:00:00']]

btw, 在探索完UCS-time之后,我发现大约因为时间观念比较深入人心,其实是可以更自由的转换的,

# 将一般字符串转为UCS 名称
def dt_str2ucs_blockname(some_dt_str):
    some_dt_str1   =some_dt_str.replace('-','.').replace(' ','.').replace(':','.')
    return '.'.join(some_dt_str1.split('.')[:4])

'''
dt_str2ucs_blockname('2024-06-24 09:30:00')
'2024.06.24.09'
'''

这些函数也被纳入基础函数,有时候可以起到辅助的作用,特别在使用GFGolite不那么方便的时候。

一个更具体的场景,我觉得是具有通用性的,描述如下:

  • 1 数据没有顺序id,但是具备入库时间(create_time)
  • 2 数据入库时间是不连续的,有一些时间段密集,更多的时间可能空闲,但也并不是一条没有。
  • 3 每次执行时,worker总是能够有效推进一个brick

具体做法:

p01: 将需要的时间块按序持久化为本地文件,并将对应的bounds作为字典

p02: 获取当前时间的时间块

# 将标准时间字符串转为brick
def timestr2timeblock(some_time_str = None):
    x1 = some_time_str.replace('-','.').replace(' ','.').replace(':','.')
    return '.'.join(x1.split('.')[:4])

the_time_now = get_time_str1()
the_time_brick = timestr2timeblock(the_time_now)

p03:从gb中获取上一次处理过的最新brick,并锁定在时间轴的位置。


worker_buffer_space = 'sp_worker.general'
tier1 = 'some_tier'
tier2 = 'ucs_time_brick_ordered.sniffer'
prefix = '.'.join([worker_buffer_space,tier1,tier2]) +'.'

# ==========================  Load
time_brick_list = from_pickle('time_brick_list')
time_bounds_dict = from_pickle('time_bounds_dict')
gb = GlobalBuffer()
watch_brick_pos = time_brick_list.index(the_time_brick)

p04:按照已处理之后,当前观察时间之前的原则,选出available_time_brick_list

latest_data_brick = gb.getx(prefix +'last_brick_handled')
if latest_data_brick is None:
    start_pos = 0
else:
    start_pos =  time_brick_list.index(latest_data_brick)

available_time_brick_list = time_brick_list[start_pos+1:watch_brick_pos]

p05:按顺序遍历时间块,直到处理到一个有效的brick才中断本次循环

# cur_data_brick = available_time_brick_list[0]
for cur_data_brick in available_time_brick_list:
    print(cur_data_brick)
    cur_bound = time_bounds_dict[cur_data_brick]
    print(cur_bound)
	resp = 【获取待处理数据】
    if len(resp):
        res_df = pd.DataFrame(resp, columns = keep_cols)
        if ok_to_handle:
        	【将结果分批删除】
            listofdict2 = slice_list_by_batch2(xxx.to_dict(orient='records'), 1000)
            for some_listofdict in listofdict2:
        【更新已处理块信息】
        gb.setx(prefix +'last_brick_handled',cur_data_brick, persist=True)
        break

通过循环或者定时任务唤起这样的worker就可以。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/778912.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

python No interpreter

在 Python 的环境中,如果你遇到了 "No interpreter" 的错误或提示,这通常意味着你的开发环境或IDE(如PyCharm、VS Code、Jupyter等)没有找到 Python 解释器。要解决这个问题,你可以按照以下步骤操作&#xf…

基于Transformer神经网络的锂离子电池剩余使用寿命估计MATLAB实现【NASA电池数据集】

Transformer神经网络 基于Transformer神经网络的锂离子电池剩余使用寿命估计是一种先进的方法,它利用了Transformer模型在处理序列数据方面的优势。 Transformer能够有效地捕捉时间序列中的长程依赖关系和非线性模式,相比传统的基于循环神经网络&…

InnoDB中的表级锁、页级锁、行级锁详解

MySQL支持三种层级的锁定 我们知道,MySQL支持三种层级的锁定,分别为: 表级锁定 表级锁是MySQL中锁定粒度最大的一种锁,表示对当前操作的整张表加锁,它实现简单,资源消耗较少,被大部分MySQL引…

【C++/STL】优先级队列的介绍与模拟实现仿函数

✨ 万物与我皆是自由诗 🌏 📃个人主页:island1314 🔥个人专栏:C学习 🚀 欢迎关注:👍点赞 👂&#x1…

深入理解TCP协议格式(WireShark分析)

传输控制协议(TCP)是互联网中最为关键的通信协议之一。了解TCP协议的细节不仅对于网络工程师至关重要,对于任何涉及网络通信的软件开发人员而言都是必备的知识。本文旨在深入探讨TCP协议,从协议的基本概述到其工作机制&#xff0c…

多维度多场景文档门户,鸿翼ECM文档云打造文档管理新范式

​在现代企业运营中,内容协作的效率直接影响到组织的整体表现和竞争力。传统的文档管理系统都是通过目录结构的方式进行文件管理,在实际业务中无法满足用户多视角、多维度、多场景的文档业务需求。因此,搭建结合文档体系的业务门户是许多企业…

AI绘画【光影模型】,穿越赛博迷雾,重塑光影艺术本真魅力

有时候是不是觉得单纯依靠大模型产生的图片作品光线方面平平无奇,依靠提示词,各种权重的调整费了九牛二虎之力才抽到一张感觉还算满意的作品。这个时候我们可以考虑结合相关Lora来进行。今天带来了一款光影氛围灯效果Lora——None-光染摄影,该…

进程的概念

一.进程和程序的理解 首先抛出结论:进程是动态的,暂时存在于内存中,进程是程序的一次执行,而进程总是对应至少一个特定的程序。 程序是静态的,永久的存在于磁盘中。 程序是什么呢?程序其实就是存放在我们…

Windows如何查看端口是否占用,并结束端口进程

需求与问题:前后端配置了跨域操作,但是仍然报错,可以考虑端口被两个程序占用,找不到正确端口或者后端接口书写是否规范,特别是利用Python Flask书写时要保证缩进是否正确! Windows操作系统中,查…

Matlab中collectPlaneWave函数的应用

查看文档如下: 可以看出最多5个参数,分别是阵列对象,信号幅度,入射角度,信号频率,光速。 在下面的代码中,我们先创建一个3阵元的阵列,位置为:(-1,0,0&#x…

代码随想录算法训练Day57|LeetCode200-岛屿数量、LeetCode695-岛屿的最大面积

岛屿数量 题目描述 力扣200-岛屿数量 给你一个由 1(陆地)和 0(水)组成的的二维网格,请你计算网格中岛屿的数量。 岛屿总是被水包围,并且每座岛屿只能由水平方向和/或竖直方向上相邻的陆地连接形成。 此…

设计模式之单例模式(Java)

单例模式实现方式:懒汉式、饿汉式、双重检查、枚举、静态内部类; 懒汉式: /*** 懒汉式单例模式* author: 小手WA凉* create: 2024-07-06*/ public class LazySingleton implements Serializable {private static LazySingleton lazySinglet…

操作系统中的权限说明

什么是权限 权限在操作系统中是一个重要的功能,它允许你控制谁可以读取、写入或执行某个文件。不同的操作系统和文件系统可能有不同的权限模型,但在类Unix系统(如Linux和macOS)中,文件权限通常由三部分组成&#xff1a…

提升系统稳定性:熔断、降级和限流策略详解

文章目录 前言一、熔断(Circuit Breaker)二、降级(Degradation)三、限流(Rate Limiting)四、应用案例五、小结推荐阅读 前言 随着互联网业务的快速发展,系统稳定性和高可用性成为现代分布式系统…

Spring源码十三:非懒加载单例Bean

上一篇Spring源码十二:事件发布源码跟踪中,我们介绍了Spring中是如何使用观察者设计模式的思想来实现事件驱动开发的:实际上就是将所有监听器注册到广播器中,并通过监听该事件的监听器来处理时间的。结合前面十二篇文章我们将Spri…

电表读数检测数据集VOC+YOLO格式18156张12类别

数据集格式:Pascal VOC格式YOLO格式(不包含分割路径的txt文件,仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数):18156 标注数量(xml文件个数):18156 标注数量(txt文件个数):18156 标…

cs224n作业4

NMT结构图:(具体结构图) LSTM基础知识 nmt_model.py: 参考文章:LSTM输出结构描述 #!/usr/bin/env python3 # -*- coding: utf-8 -*-""" CS224N 2020-21: Homework 4 nmt_model.py: NMT Model Penchen…

【0基础学爬虫】爬虫基础之scrapy的使用

【0基础学爬虫】爬虫基础之scrapy的使用 大数据时代,各行各业对数据采集的需求日益增多,网络爬虫的运用也更为广泛,越来越多的人开始学习网络爬虫这项技术,K哥爬虫此前已经推出不少爬虫进阶、逆向相关文章,为实现从易到…

九浅一深Jemalloc5.3.0 -- ④浅*配置

目前市面上有不少分析Jemalloc老版本的博文,但最新版本5.3.0却少之又少。而且5.3.0的架构与5之前的版本有较大不同,本着“与时俱进”、“由浅入深”的宗旨,我将逐步分析最新release版本Jemalloc5.3.0的实现。 另外,单讲实现代码是…

多功能工具网站

江下科技在线应用-免费PDF转换成Word-word转pdf-无需下载安装 (onlinedo.cn)https://www.onlinedo.cn/