Python脚本之操作Redis Cluster【三】

本文为博主原创,未经授权,严禁转载及使用。
本文链接:https://blog.csdn.net/zyooooxie/article/details/135485606

之前写了2篇 操作redis集群的 https://blog.csdn.net/zyooooxie/article/details/123760358 、 https://blog.csdn.net/zyooooxie/article/details/112484045 ,这期再分享下 日常我咋用的。

【实际这篇博客推迟发布N个月】

个人博客:https://blog.csdn.net/zyooooxie

【以下所有内容仅为个人项目经历,如有不同,纯属正常】

代码 : redis-py

"""
@blog: https://blog.csdn.net/zyooooxie
@qq: 153132336
@email: zyooooxie@gmail.com
"""

import math
import time
import random

from deprecated import deprecated
from typing import Optional, Union, List

from redis.cluster import RedisCluster
from redis.cluster import ClusterNode
from redis.commands.core import BasicKeyCommands, ListCommands, SetCommands, HashCommands

from user_log import Log


# pip install redis==4.4.1

@deprecated(reason="不推荐使用")
def redis_connect(host: str, port: int, pwd: str):
    """

    :param host:
    :param port:
    :param pwd:
    :return:
    """

    Log.info(f'{host}, {port}, {pwd}')

    from redis import Redis

    r = Redis(host=host, port=port, decode_responses=True, password=pwd, db=0)
    Log.info(r)

    return r


@deprecated(reason="不推荐使用")
def get_cluster_info(host: str, port: int, pwd: str):
    """

    :param host:
    :param port:
    :param pwd:
    :return:
    """

    rc = redis_cluster_connect(host=host, port=port, pwd=pwd)
    Log.info(rc.cluster_nodes())

    master_list = [k for k, v in rc.cluster_nodes().items() if v.get('flags').find('master') != -1]
    Log.info(master_list)

    slots_dict = {k: v.get('slots')[0] for k, v in rc.cluster_nodes().items() if v.get('flags').find('master') != -1}
    Log.info(slots_dict)

    rc.close()

    return master_list, slots_dict, pwd


def get_redis_connection_info(redis_name: str):
    """

    :param redis_name:
    :return:
    """
    from xxx_use.common_functions import read_ini

    redis_config = read_ini('redis.ini').get(redis_name)
    redis_config_dict = dict(redis_config)

    host = redis_config_dict.get('host')
    port = redis_config_dict.get('port')
    pwd = redis_config_dict.get('pwd')

    return host, port, pwd


def redis_cluster_connect(host: str, port: int, pwd: str):
    """

    :param host:
    :param port:
    :param pwd:
    :return:
    """
    Log.info(f'{host}, {port}, {pwd}')

    if random.getrandbits(1):

        nodes = [ClusterNode(host, port)]
        rc = RedisCluster(startup_nodes=nodes, password=pwd, decode_responses=True)

    else:

        rc = RedisCluster(host=host, port=port, password=pwd, decode_responses=True)

    Log.info(rc)
    Log.debug(rc.__dict__)

    return rc


def cluster_get_key_Use_scan_iter(key_name: str, rc: RedisCluster, scan_count: int = 5000,
                                  key_type: str = 'string', only_need_name: bool = False):
    """

    :param key_name:
    :param rc:
    :param scan_count:
    :param key_type:
    :param only_need_name:
    :return:
    """
    assert key_type in ['string', 'hash', 'list', 'set']
    Log.info(key_name)

    res_dict = dict()

    names_generator = rc.scan_iter(match=key_name, count=scan_count)

    for names in names_generator:

        if only_need_name:  # 只要 key

            names_dict = {names: None}
            res_dict.update(names_dict)

        else:
            # key + value

            value = rc.get(names) if key_type == 'string' else (rc.hgetall(names) if key_type == 'hash' else
                                                                (rc.lrange(names, 0, -1) if key_type == 'list'
                                                                 else rc.smembers(names)))
            res_dict.update({names: value})

    Log.info('scan_iter 找到相关key的数量:{}'.format(len(res_dict)))

    # # 和keys命令的结果 做个校验
    # assert len(res_dict) == len(cluster_get_key_Use_KEYS(key_name=key_name, rc=rc))

    rd_copy = res_dict.copy()
    len_rd = 10 if len(rd_copy) > 10 else len(rd_copy)
    Log.info([rd_copy.popitem() for _ in range(len_rd)])

    return res_dict


def cluster_get_key_Use_SCAN(key_name: str, rc: RedisCluster, scan_count: int = 5000,
                             key_type: str = 'string', only_need_name: bool = False):
    """
    只要name时,返回的是 {key_name1:None, key_name2:None} ;要name + value时,返回的是 {key_name1:value1, key_name2:value2}
    :param key_name:
    :param rc:
    :param scan_count:
    :param key_type:
    :param only_need_name:
    :return:
    """
    assert key_type in ['string', 'hash', 'list', 'set']
    Log.info(key_name)

    primary_list = rc.get_primaries()

    res_dict = dict()

    for primary in primary_list:
        Log.debug(primary)

        prc = primary.redis_connection

        cur = 0

        while True:

            cur, names_list = prc.scan(cur, key_name, count=scan_count)

            if only_need_name:  # 只要 key
                names_dict = dict.fromkeys(names_list)
                res_dict.update(names_dict)

            else:  # key + value

                for names in names_list:
                    value = prc.get(names) if key_type == 'string' else (prc.hgetall(names) if key_type == 'hash' else
                                                                         (prc.lrange(names, 0, -1) if key_type == 'list'
                                                                          else prc.smembers(names)))
                    res_dict.update({names: value})

            if not cur:
                break

    Log.info('scan命令 找到相关key的数量:{}'.format(len(res_dict)))

    # # 和keys命令的结果 做个校验
    # assert len(res_dict) == len(cluster_get_key_Use_KEYS(key_name=key_name, rc=rc))

    rd_copy = res_dict.copy()
    len_rd = 10 if len(rd_copy) > 10 else len(rd_copy)
    Log.info([rd_copy.popitem() for _ in range(len_rd)])

    return res_dict


def cluster_get_key_Use_KEYS(key_name: str, rc: RedisCluster, target_nodes: str = RedisCluster.PRIMARIES):
    """

    :param key_name:
    :param rc:
    :param target_nodes:
    :return:
    """

    # keys命令 使用: 只能在测试环境 + 单独业务使用的集群
    data_list = rc.keys(key_name, target_nodes=target_nodes)

    Log.info(f'keys命令 查找到的所有key的数量:{len(data_list)}')
    Log.info(data_list[-10:])

    return data_list


def cluster_unlink_key_Use_SCAN(key_name: str, rc: RedisCluster, scan_count: int = 5000):
    """

    :param key_name:
    :param rc:
    :param scan_count:
    :return:
    """

    res_dict = cluster_get_key_Use_SCAN(key_name=key_name, scan_count=scan_count,
                                        rc=rc, only_need_name=True)

    res_list = list(res_dict.keys())

    cluster_for_execute_command(rc=rc, command='unlink', exe_nums=1000, data=res_list)


def cluster_for_execute_command(rc: RedisCluster, command: str, exe_nums: int, data: List,
                                unpacking: bool = True, **kwargs):
    """
    
    :param rc: 
    :param command: 
    :param exe_nums: 
    :param data: 
    :param unpacking: 
    :param kwargs: 
    :return: 
    """""

    # 集群 操作某个 | 某些 key时,先 计算slot,再 向target node推命令:具体操作命令 + 某个|某些key。

    # 想 直接在某节点 删掉SCAN出来在此节点的 key,报错:CLUSTER_REDIR_CROSS_SLOT

    method = list(BasicKeyCommands.__dict__.keys()) + list(HashCommands.__dict__.keys()) + list(
        ListCommands.__dict__.keys()) + list(SetCommands.__dict__.keys())

    assert command in [m for m in method if m.startswith('__') is False]

    Log.error(command)

    res = list()

    for i in range(math.ceil(len(data) / exe_nums)):
        start = i * exe_nums

        if exe_nums == 1:

            ele = data[start]  # 直接取这个元素

        else:

            ele = data[start: start + exe_nums]

        if unpacking:

            res.append(getattr(rc, command)(*ele, **kwargs))

        else:
            res.append(getattr(rc, command)(ele, **kwargs))

    else:

        Log.info('全部搞完: {}条'.format(len(data)))
        Log.info(res[-10:])

        time.sleep(1)

        return res


if __name__ == '__main__':
    Log.error('🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀')

    rc_m = redis_cluster_connect(*get_redis_connection_info('xie'))

    nums_m = random.randint(10000, 99999)
    insert_nums = 19
    for_ = range(nums_m, nums_m + insert_nums)

    Log.error('🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀')

    # cluster_for_execute_command(rc=rc_m, command='set',
    #                             data=[('TEST_xie_s' + str(i), 'String_value' + str(i)) for i in for_],
    #                             exe_nums=1, ex=3600 * 24 * 7)
    #
    # cluster_for_execute_command(rc=rc_m, command='get',
    #                             data=['TEST_xie_s' + str(i) for i in for_],
    #                             exe_nums=1, unpacking=False)
    #
    # Log.error('🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀')
    #
    # # hset() 执行时,默认 传的是key、value;
    #
    # cluster_for_execute_command(rc=rc_m, command='hset',
    #                             data=[('TEST_xie_h' + str(i), 'h_field', 'Hash_value' + str(i)) for i in for_],
    #                             exe_nums=1)
    #
    # cluster_for_execute_command(rc=rc_m, command='hset',
    #                             data=['TEST_xie_h' + str(i) for i in for_],
    #                             exe_nums=1, unpacking=False, key='h_field相同的', value='h_value相同的')
    #
    # cluster_for_execute_command(rc=rc_m, command='hset',
    #                             data=['TEST_xie_h' + str(i) for i in for_],
    #                             exe_nums=1, unpacking=False,
    #                             mapping={'h_field相同的': 'h_value相同的', 'h_field1': 'h_value1'})
    #
    # cluster_for_execute_command(rc=rc_m, command='hset',
    #                             data=[('TEST_xie_h' + str(i), 'h_field' + str(i), 'H_value' + str(i)) for i in for_],
    #                             exe_nums=1, mapping={'h_field相同的': 'h_value相同的'})
    #
    # Log.info('🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀')
    #
    # # 若是 多个field ,建议:直接对所有key做个循环,使用hset,传mapping。
    # for i in for_:
    #     h_dict = {'hash__' + str(i): 'value__1' + str(i), 'hash' + str(i): 'value' + str(i)}
    #     cluster_for_execute_command(rc=rc_m, command='hset', data=['TEST_xie_h' + str(i)],
    #                                 exe_nums=1, unpacking=False, mapping=h_dict)
    #
    #     cluster_for_execute_command(rc=rc_m, command='hgetall',
    #                                 data=['TEST_xie_h' + str(i)],
    #                                 exe_nums=1, unpacking=False)
    #
    # Log.info('🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀')
    #
    # cluster_for_execute_command(rc=rc_m, command='hgetall',
    #                             data=['TEST_xie_h' + str(i) for i in for_],
    #                             exe_nums=1, unpacking=False)
    #
    # cluster_for_execute_command(rc=rc_m, command='hget',
    #                             data=[('TEST_xie_h' + str(i), 'h_field') for i in for_], exe_nums=1)
    #
    # Log.error('🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀')
    #
    # cluster_for_execute_command(rc=rc_m, command='rpush',
    #                             data=[('TEST_xie_l' + str(i), 'List_value' + str(i), 'lv1', 'lv2') for i in for_],
    #                             exe_nums=1)
    #
    # cluster_for_execute_command(rc=rc_m, command='lrange',
    #                             data=['TEST_xie_l' + str(i) for i in for_],
    #                             exe_nums=1, unpacking=False, start=0, end=-1)
    #
    # cluster_for_execute_command(rc=rc_m, command='lrange',
    #                             data=[('TEST_xie_l' + str(i), 0, -1) for i in for_],
    #                             exe_nums=1)

    # Log.error('🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀')
    #
    # cluster_for_execute_command(rc=rc_m, command='sadd',
    #                             data=[('TEST_xie_set' + str(i), 'sv_' + str(i), 'sv1', 'sv2', 'sv3') for i in for_],
    #                             exe_nums=1)
    #
    # cluster_for_execute_command(rc=rc_m, command='smembers',
    #                             data=['TEST_xie_set' + str(i) for i in for_],
    #                             exe_nums=1, unpacking=False)
    #
    # cluster_for_execute_command(rc=rc_m, command='unlink',
    #                             data=['TEST_xie_set' + str(i) for i in for_],
    #                             exe_nums=50)
    #
    # Log.error('🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀')


    kn_m = 'TEST_xie*'
    cluster_unlink_key_Use_SCAN(key_name=kn_m, rc=rc_m)

    # cluster_get_key_Use_SCAN(key_name=kn_m, rc=rc_m, only_need_name=True)
    # cluster_get_key_Use_scan_iter(key_name=kn_m, rc=rc_m, only_need_name=True)
    #
    # cluster_get_key_Use_SCAN(key_name=kn_m, rc=rc_m)
    # cluster_get_key_Use_scan_iter(key_name=kn_m, rc=rc_m)
    #
    # cluster_get_key_Use_SCAN(key_name=kn_m, rc=rc_m, key_type='hash')
    # cluster_get_key_Use_scan_iter(key_name=kn_m, rc=rc_m, key_type='hash')
    #
    # cluster_get_key_Use_SCAN(key_name=kn_m, rc=rc_m, key_type='list')
    # cluster_get_key_Use_scan_iter(key_name=kn_m, rc=rc_m, key_type='list')
    #
    # cluster_get_key_Use_SCAN(key_name=kn_m, rc=rc_m, key_type='set')
    # cluster_get_key_Use_scan_iter(key_name=kn_m, rc=rc_m, key_type='set')

    rc_m.close()

    Log.error('🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀')

客户端:RedisInsight

https://redis.com/redis-enterprise/redis-insight/

在这里插入图片描述

本文链接:https://blog.csdn.net/zyooooxie/article/details/135485606

个人博客 https://blog.csdn.net/zyooooxie

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/348041.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

2021 Google Chrome RCE漏洞分析

一、复现环境: Win10 Google Chrome 86.0.4240.75 二、利用复现: 关闭沙箱安全使用命令进行关闭 ,在正常情况下,浏览器沙箱提供了一个受限制的执行环境,以防止恶意代码对用户系统的损害。关闭沙箱可能会导致浏览器执…

查询排序(1)

Oracle从入门到总裁:https://blog.csdn.net/weixin_67859959/article/details/135209645 前面介绍了在 SQL 限定查询中 WHERE 子句的运行顺序优先于 SELECT 子句,WHERE 子句确定数据行,SELECT 子句确定数据列。 也分别讲述了在 WHERE 子句中常用的运算…

相机拍摄基础

相机拍摄 1.索尼A7M3摄影机挡位 AUTO自动档,光圈快门自动调整。 P档半自动档,只能调整感光度,光圈快门随之变化。 A档,光圈优先,只能调整光圈值,快门随之变化。适合拍摄风景、人像。 S档,快…

SpringBoot整合redisson实现分布式锁

SpringBoot整合redisson实现分布式锁 本文主要通过 SpringBoot 整合 redisson 来实现分布式锁&#xff0c;并结合 demo 测试结果。 1、pom依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0…

【Linux】 开始使用 gcc 吧!!!

Linux 1 认识gcc2 背景知识3 gcc 怎样完成 &#xff1f;3.1 预处理预处理^条件编译 3.2 编译3.3 汇编3.4 链接 4 函数库5 gcc 基本选项Thanks♪(&#xff65;ω&#xff65;)&#xff89;谢谢阅读下一篇文章见&#xff01;&#xff01;&#xff01; 1 认识gcc 我们在windows环…

【LIBS】交叉编译TCPDUMP

目录 1. 安装编译工具2. 设置环境变量3. 编译libpcap3.1 安装依赖3.2 交叉编译 4. 编译TCPDUMP4.1 克隆仓库与生成构建环境4.2 静态链接LIBPCAP4.3 动态链接LIBPCAP4.4 构建与安装 5. 查看交叉编译结果5.1 文件布局 1. 安装编译工具 sudo apt-get install -y autoconf automak…

SaaS系统如何助力企业数字化转型

随着科技的快速发展&#xff0c;数字化转型已经成为企业适应市场变化、提高竞争力的必要手段。在这个过程中&#xff0c;SaaS&#xff08;软件即服务&#xff09;系统以其独特的优势&#xff0c;正在成为越来越多企业的首选。乔拓云SaaS系统作为这一领域的佼佼者&#xff0c;更…

谷歌出品!读懂 QUIC 协议:更快、更高效的通信协议

QUIC结构 QUIC协议模型如下图所示&#xff0c;其放弃了TCP∕IP网络中使用五元组(源IP,源端口,目的IP,目的端口,协议标识符)来唯一标识一条连接的方式,而使用一个全局唯一的随机生成的ID(即Connection ID) 来标识一条连接。 由低向上分层讨论QUIC协议&#xff1a; •UDP层:在U…

1990-2019年城市维度区域创新创业指数面板数据/地级市创新创业指数面板数据

1990-2019年城市维度区域创新创业指数面板数据/地级市创新创业指数面板数据 1、时间&#xff1a;1990-2019年 2、范围&#xff1a;地级市&#xff08;290&#xff09; 3、指标&#xff1a;序号、年份、城市码、城市、总维度&#xff1a;总量指数得分、人均得分、单位面积得分…

应用协议漏洞

应用协议漏洞 一、rsync rsync是Linux下一款数据备份工具&#xff0c;支持通过rsync协议、ssh协议进行远程文件传输。其中rsync协议默认监听873端口 1.未授权访问 打开靶场 判断漏洞是否存在 rsync rsync://目标ip:端口读取文件 rsync rsync://47.99.49.128:873/src/tmp/下…

访问者模式-C#实现

该实例基于WPF实现&#xff0c;直接上代码&#xff0c;下面为三层架构的代码。 一 Model using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks;namespace 设计模式练习.Model.访问者模式 {public class Com…

JRT的无源码发布

之前介绍过JRT最大的特点就是业务脚本化。老javaer就会说你业务代码都在发布环境放着&#xff0c;那怎么代码保密&#xff0c;在发布环境别人随便改了启不是不安全&#xff0c;或者一些代码我就是不想让人看源码呢。 其实JRT的业务脚本化只是特性&#xff0c;不是代表就必须要…

如何进行H.265视频播放器EasyPlayer.js的中性化设置?

H5无插件流媒体播放器EasyPlayer属于一款高效、精炼、稳定且免费的流媒体播放器&#xff0c;可支持多种流媒体协议播放&#xff0c;可支持H.264与H.265编码格式&#xff0c;性能稳定、播放流畅&#xff0c;能支持WebSocket-FLV、HTTP-FLV&#xff0c;HLS&#xff08;m3u8&#…

C#用DateAndTime.DateDiff方法和TimeSpan分别计算时间间隔

目录 一、计算时间间隔的方法 1.用DateAndTime.DateDiff方法计算时间间隔 2.使用TimeSpan获取日期时间间隔 二、实例 1.示例一&#xff1a;用DateAndTime.DateDiff方法计算时间间隔 2.示例二&#xff1a;使用TimeSpan获取日期时间间隔 一、计算时间间隔的方法 1.用Date…

深入《羊了个羊》:从0到1的消除游戏开发

一、游戏简介 《羊了个羊》是一款备受欢迎的消除类游戏。玩家需要通过交换相邻的方块&#xff0c;使三个或更多相同方块连成一线&#xff0c;从而将它们消除。消除方块可以获得分数&#xff0c;并在全球排行榜上与其他玩家竞争。 设置项目结构 首先&#xff0c;在文本编辑器中…

【博客搭建记录贴】day4_Hexo基本操作,添加草稿并发布

目录 1.将项目导入到开发环境1.1 先把项目导入到IDEA中1.2 确认IDEA中服务器启动正常 2.Hexo基本操作: 添加草稿并发布2.1 生成一个草稿文件2.2 在页面上查看草稿3.3 将草稿正式发布 1.将项目导入到开发环境 我本地已经安装了 IntelliJ IDEA&#xff08;版本&#xff1a;社区版…

【modelsim使用】数据显示设置

本文介绍modelsim使用中数据的显示设置&#xff0c;定点小数的显示、模拟波形的显示、数据截位查看、信号颜色和行高设置的操作。 文章目录 定点小数显示模拟波形的显示选取信号的某几位组合查看信号颜色与行高设置 定点小数显示 使用modelsim进行仿真时&#xff0c;涉及到定点…

【GitHub项目推荐--一款美观的开源社区系统】【转载】

推荐一款开源社区系统&#xff0c;该系统基于主流的 Java Web 技术栈&#xff0c;如果你是一名 Java 新手掌握了基本 JavaEE 框架知识&#xff0c;可以拿本项目作为练手项目。 开源社区系统功能还算完善包含发布帖子、发布评论、私信、系统通知、点赞、关注、搜索、用户设置、…

【MySQL】内外连接

内外连接 一、内连接二、外连接1、左外连接2、右外连接 表的连接分为内连和外连。 一、内连接 内连接实际上就是利用where子句对两种表形成的笛卡儿积进行筛选。只不过为了让sql的可读性更好&#xff0c;我们使用其他的关键字进行内连接。 语法&#xff1a; SELECT ... FRO…

BGP路由协议通告原则

1仅将自己最优的路由发给BGP邻居 一般情况下,如果BGP Speaker学到去往同一网段的路由多于一条时,只会选择一条最优的路由给自己使用,即用来发布给邻居,同时上送给IP路由表。但是,由于路由器也会选择最优的路由给自己使用,所以BGP Speaker本身选择的最优的路由也不一定被…