本文为博主原创,未经授权,严禁转载及使用。
本文链接:https://blog.csdn.net/zyooooxie/article/details/135485606
之前写了2篇 操作redis集群的 https://blog.csdn.net/zyooooxie/article/details/123760358 、 https://blog.csdn.net/zyooooxie/article/details/112484045 ,这期再分享下 日常我咋用的。
【实际这篇博客推迟发布N个月】
个人博客:https://blog.csdn.net/zyooooxie
【以下所有内容仅为个人项目经历,如有不同,纯属正常】
代码 : redis-py
"""
@blog: https://blog.csdn.net/zyooooxie
@qq: 153132336
@email: zyooooxie@gmail.com
"""
import math
import time
import random
from deprecated import deprecated
from typing import Optional, Union, List
from redis.cluster import RedisCluster
from redis.cluster import ClusterNode
from redis.commands.core import BasicKeyCommands, ListCommands, SetCommands, HashCommands
from user_log import Log
# pip install redis==4.4.1
@deprecated(reason="不推荐使用")
def redis_connect(host: str, port: int, pwd: str):
"""
:param host:
:param port:
:param pwd:
:return:
"""
Log.info(f'{host}, {port}, {pwd}')
from redis import Redis
r = Redis(host=host, port=port, decode_responses=True, password=pwd, db=0)
Log.info(r)
return r
@deprecated(reason="不推荐使用")
def get_cluster_info(host: str, port: int, pwd: str):
"""
:param host:
:param port:
:param pwd:
:return:
"""
rc = redis_cluster_connect(host=host, port=port, pwd=pwd)
Log.info(rc.cluster_nodes())
master_list = [k for k, v in rc.cluster_nodes().items() if v.get('flags').find('master') != -1]
Log.info(master_list)
slots_dict = {k: v.get('slots')[0] for k, v in rc.cluster_nodes().items() if v.get('flags').find('master') != -1}
Log.info(slots_dict)
rc.close()
return master_list, slots_dict, pwd
def get_redis_connection_info(redis_name: str):
"""
:param redis_name:
:return:
"""
from xxx_use.common_functions import read_ini
redis_config = read_ini('redis.ini').get(redis_name)
redis_config_dict = dict(redis_config)
host = redis_config_dict.get('host')
port = redis_config_dict.get('port')
pwd = redis_config_dict.get('pwd')
return host, port, pwd
def redis_cluster_connect(host: str, port: int, pwd: str):
"""
:param host:
:param port:
:param pwd:
:return:
"""
Log.info(f'{host}, {port}, {pwd}')
if random.getrandbits(1):
nodes = [ClusterNode(host, port)]
rc = RedisCluster(startup_nodes=nodes, password=pwd, decode_responses=True)
else:
rc = RedisCluster(host=host, port=port, password=pwd, decode_responses=True)
Log.info(rc)
Log.debug(rc.__dict__)
return rc
def cluster_get_key_Use_scan_iter(key_name: str, rc: RedisCluster, scan_count: int = 5000,
key_type: str = 'string', only_need_name: bool = False):
"""
:param key_name:
:param rc:
:param scan_count:
:param key_type:
:param only_need_name:
:return:
"""
assert key_type in ['string', 'hash', 'list', 'set']
Log.info(key_name)
res_dict = dict()
names_generator = rc.scan_iter(match=key_name, count=scan_count)
for names in names_generator:
if only_need_name: # 只要 key
names_dict = {names: None}
res_dict.update(names_dict)
else:
# key + value
value = rc.get(names) if key_type == 'string' else (rc.hgetall(names) if key_type == 'hash' else
(rc.lrange(names, 0, -1) if key_type == 'list'
else rc.smembers(names)))
res_dict.update({names: value})
Log.info('scan_iter 找到相关key的数量:{}'.format(len(res_dict)))
# # 和keys命令的结果 做个校验
# assert len(res_dict) == len(cluster_get_key_Use_KEYS(key_name=key_name, rc=rc))
rd_copy = res_dict.copy()
len_rd = 10 if len(rd_copy) > 10 else len(rd_copy)
Log.info([rd_copy.popitem() for _ in range(len_rd)])
return res_dict
def cluster_get_key_Use_SCAN(key_name: str, rc: RedisCluster, scan_count: int = 5000,
key_type: str = 'string', only_need_name: bool = False):
"""
只要name时,返回的是 {key_name1:None, key_name2:None} ;要name + value时,返回的是 {key_name1:value1, key_name2:value2}
:param key_name:
:param rc:
:param scan_count:
:param key_type:
:param only_need_name:
:return:
"""
assert key_type in ['string', 'hash', 'list', 'set']
Log.info(key_name)
primary_list = rc.get_primaries()
res_dict = dict()
for primary in primary_list:
Log.debug(primary)
prc = primary.redis_connection
cur = 0
while True:
cur, names_list = prc.scan(cur, key_name, count=scan_count)
if only_need_name: # 只要 key
names_dict = dict.fromkeys(names_list)
res_dict.update(names_dict)
else: # key + value
for names in names_list:
value = prc.get(names) if key_type == 'string' else (prc.hgetall(names) if key_type == 'hash' else
(prc.lrange(names, 0, -1) if key_type == 'list'
else prc.smembers(names)))
res_dict.update({names: value})
if not cur:
break
Log.info('scan命令 找到相关key的数量:{}'.format(len(res_dict)))
# # 和keys命令的结果 做个校验
# assert len(res_dict) == len(cluster_get_key_Use_KEYS(key_name=key_name, rc=rc))
rd_copy = res_dict.copy()
len_rd = 10 if len(rd_copy) > 10 else len(rd_copy)
Log.info([rd_copy.popitem() for _ in range(len_rd)])
return res_dict
def cluster_get_key_Use_KEYS(key_name: str, rc: RedisCluster, target_nodes: str = RedisCluster.PRIMARIES):
"""
:param key_name:
:param rc:
:param target_nodes:
:return:
"""
# keys命令 使用: 只能在测试环境 + 单独业务使用的集群
data_list = rc.keys(key_name, target_nodes=target_nodes)
Log.info(f'keys命令 查找到的所有key的数量:{len(data_list)}')
Log.info(data_list[-10:])
return data_list
def cluster_unlink_key_Use_SCAN(key_name: str, rc: RedisCluster, scan_count: int = 5000):
"""
:param key_name:
:param rc:
:param scan_count:
:return:
"""
res_dict = cluster_get_key_Use_SCAN(key_name=key_name, scan_count=scan_count,
rc=rc, only_need_name=True)
res_list = list(res_dict.keys())
cluster_for_execute_command(rc=rc, command='unlink', exe_nums=1000, data=res_list)
def cluster_for_execute_command(rc: RedisCluster, command: str, exe_nums: int, data: List,
unpacking: bool = True, **kwargs):
"""
:param rc:
:param command:
:param exe_nums:
:param data:
:param unpacking:
:param kwargs:
:return:
"""""
# 集群 操作某个 | 某些 key时,先 计算slot,再 向target node推命令:具体操作命令 + 某个|某些key。
# 想 直接在某节点 删掉SCAN出来在此节点的 key,报错:CLUSTER_REDIR_CROSS_SLOT
method = list(BasicKeyCommands.__dict__.keys()) + list(HashCommands.__dict__.keys()) + list(
ListCommands.__dict__.keys()) + list(SetCommands.__dict__.keys())
assert command in [m for m in method if m.startswith('__') is False]
Log.error(command)
res = list()
for i in range(math.ceil(len(data) / exe_nums)):
start = i * exe_nums
if exe_nums == 1:
ele = data[start] # 直接取这个元素
else:
ele = data[start: start + exe_nums]
if unpacking:
res.append(getattr(rc, command)(*ele, **kwargs))
else:
res.append(getattr(rc, command)(ele, **kwargs))
else:
Log.info('全部搞完: {}条'.format(len(data)))
Log.info(res[-10:])
time.sleep(1)
return res
if __name__ == '__main__':
Log.error('🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀')
rc_m = redis_cluster_connect(*get_redis_connection_info('xie'))
nums_m = random.randint(10000, 99999)
insert_nums = 19
for_ = range(nums_m, nums_m + insert_nums)
Log.error('🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀')
# cluster_for_execute_command(rc=rc_m, command='set',
# data=[('TEST_xie_s' + str(i), 'String_value' + str(i)) for i in for_],
# exe_nums=1, ex=3600 * 24 * 7)
#
# cluster_for_execute_command(rc=rc_m, command='get',
# data=['TEST_xie_s' + str(i) for i in for_],
# exe_nums=1, unpacking=False)
#
# Log.error('🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀')
#
# # hset() 执行时,默认 传的是key、value;
#
# cluster_for_execute_command(rc=rc_m, command='hset',
# data=[('TEST_xie_h' + str(i), 'h_field', 'Hash_value' + str(i)) for i in for_],
# exe_nums=1)
#
# cluster_for_execute_command(rc=rc_m, command='hset',
# data=['TEST_xie_h' + str(i) for i in for_],
# exe_nums=1, unpacking=False, key='h_field相同的', value='h_value相同的')
#
# cluster_for_execute_command(rc=rc_m, command='hset',
# data=['TEST_xie_h' + str(i) for i in for_],
# exe_nums=1, unpacking=False,
# mapping={'h_field相同的': 'h_value相同的', 'h_field1': 'h_value1'})
#
# cluster_for_execute_command(rc=rc_m, command='hset',
# data=[('TEST_xie_h' + str(i), 'h_field' + str(i), 'H_value' + str(i)) for i in for_],
# exe_nums=1, mapping={'h_field相同的': 'h_value相同的'})
#
# Log.info('🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀')
#
# # 若是 多个field ,建议:直接对所有key做个循环,使用hset,传mapping。
# for i in for_:
# h_dict = {'hash__' + str(i): 'value__1' + str(i), 'hash' + str(i): 'value' + str(i)}
# cluster_for_execute_command(rc=rc_m, command='hset', data=['TEST_xie_h' + str(i)],
# exe_nums=1, unpacking=False, mapping=h_dict)
#
# cluster_for_execute_command(rc=rc_m, command='hgetall',
# data=['TEST_xie_h' + str(i)],
# exe_nums=1, unpacking=False)
#
# Log.info('🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀')
#
# cluster_for_execute_command(rc=rc_m, command='hgetall',
# data=['TEST_xie_h' + str(i) for i in for_],
# exe_nums=1, unpacking=False)
#
# cluster_for_execute_command(rc=rc_m, command='hget',
# data=[('TEST_xie_h' + str(i), 'h_field') for i in for_], exe_nums=1)
#
# Log.error('🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀')
#
# cluster_for_execute_command(rc=rc_m, command='rpush',
# data=[('TEST_xie_l' + str(i), 'List_value' + str(i), 'lv1', 'lv2') for i in for_],
# exe_nums=1)
#
# cluster_for_execute_command(rc=rc_m, command='lrange',
# data=['TEST_xie_l' + str(i) for i in for_],
# exe_nums=1, unpacking=False, start=0, end=-1)
#
# cluster_for_execute_command(rc=rc_m, command='lrange',
# data=[('TEST_xie_l' + str(i), 0, -1) for i in for_],
# exe_nums=1)
# Log.error('🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀')
#
# cluster_for_execute_command(rc=rc_m, command='sadd',
# data=[('TEST_xie_set' + str(i), 'sv_' + str(i), 'sv1', 'sv2', 'sv3') for i in for_],
# exe_nums=1)
#
# cluster_for_execute_command(rc=rc_m, command='smembers',
# data=['TEST_xie_set' + str(i) for i in for_],
# exe_nums=1, unpacking=False)
#
# cluster_for_execute_command(rc=rc_m, command='unlink',
# data=['TEST_xie_set' + str(i) for i in for_],
# exe_nums=50)
#
# Log.error('🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀')
kn_m = 'TEST_xie*'
cluster_unlink_key_Use_SCAN(key_name=kn_m, rc=rc_m)
# cluster_get_key_Use_SCAN(key_name=kn_m, rc=rc_m, only_need_name=True)
# cluster_get_key_Use_scan_iter(key_name=kn_m, rc=rc_m, only_need_name=True)
#
# cluster_get_key_Use_SCAN(key_name=kn_m, rc=rc_m)
# cluster_get_key_Use_scan_iter(key_name=kn_m, rc=rc_m)
#
# cluster_get_key_Use_SCAN(key_name=kn_m, rc=rc_m, key_type='hash')
# cluster_get_key_Use_scan_iter(key_name=kn_m, rc=rc_m, key_type='hash')
#
# cluster_get_key_Use_SCAN(key_name=kn_m, rc=rc_m, key_type='list')
# cluster_get_key_Use_scan_iter(key_name=kn_m, rc=rc_m, key_type='list')
#
# cluster_get_key_Use_SCAN(key_name=kn_m, rc=rc_m, key_type='set')
# cluster_get_key_Use_scan_iter(key_name=kn_m, rc=rc_m, key_type='set')
rc_m.close()
Log.error('🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀🚀')
客户端:RedisInsight
https://redis.com/redis-enterprise/redis-insight/
本文链接:https://blog.csdn.net/zyooooxie/article/details/135485606
个人博客 https://blog.csdn.net/zyooooxie