背景
承接前文,由于上一个app 爬出来的数据只有 1w 多条,感觉不是很过瘾
所以这次又找到了一个非破解版 app,数据量大概有 40w,安全等级直线上升
声明
本次爬虫是学习实践行为,获取到的数据均已在 24 小时内全部删除,请大家不要轻易模仿,维护网络生态,共建美好生活
抓包分析
首先通过 Reqable 和 mumu 安卓模拟器进行抓包分析
得到如下接口:
通过接口分析可知,该 app 有登录注册,并且搜索接口有分页功能,还对 sex 参数进行了区分,从而查出不同接口
这里主要对 search 接口进行 爬取
难点
浅尝辄止后发现了以下难点:
难点 1:搜索接口有次数限制,普通用户一天只能搜索 6 次,svip 无限次
数次尝试后,找到了无限获取积分的漏洞,可以靠积分进行svip 的兑换
难点 2:就算有 svip 了,还会有风控机制,如果一个用户 token 就行搜索接口访问,太频繁了会封号(0-2 秒一次的访问频率)
这里其实也可以用线程池解决,不用线程池携带不同 token,不过我还是采用了单线程,保证对方服务器的稳定运行,不给对方造成任何经济上的损失
难点 3:经多次验证后发现,单个 token 日访问次数达到 730 次后,会被封号
难点 4:有分页机制,且 10 只会展示 10 页,
有时候 11 页会重复展示 10页的内容,这种情况会有无限页;
有时候 11 页数据为空
需要考虑到这两种情况
解决方案
迭代 1:还是用单字作为入参,5000 多行入参去访问,拿到数据后进行去重等处理
迭代 2:增加 740 次调用限制,防止 token 被封号(经测试后发现,第二天还可以重复利用)
迭代 3:一步步实验后,手上的 token 多了起来,于是创建了个 csv 文本,写了个 token 管理脚本,实现了 token 自动化
迭代 4:越来越懒,接入了接码平台,如果没有 token 了,可以自动注册一个,自此,脚本实现全自动化
脚本
Search 脚本
import json
import logging
import os
import random
import sys
import time
import warnings
from datetime import datetime
from typing import List, Dict, Tuple
import subprocess
import pandas as pd
import requests
from token_manager import get_available_token, update_token_status
# 禁用所有警告
warnings.filterwarnings('ignore')
# 配置日志格式和处理器
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] [%(threadName)s] %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
# 配置参数
BASE_URL = 'https://www.123456.com/index.php/search' # API接口地址
MAX_WORKERS = 1 # 并发线程数
BATCH_SIZE = 10 # 批量保存数据的大小
MAX_RETRIES = 3 # 请求失败时的最大重试次数
REQUEST_TIMEOUT = 10 # 请求超时时间(秒)
MAX_API_CALLS = 750 # API调用次数上限
# 添加代理配置
PROXIES = {
'http': 'http://127.0.0.1:7890',
'https': 'http://127.0.0.1:7890'
}
# HTTP请求头配置
HEADERS = {
'Accept': '*/*',
'Accept-Encoding': 'gzip',
'Connection': 'keep-alive',
'User-Agent': 'okhttp-okgo/jeasonlzy',
'accept-language': 'zh-CN,zh;q=0.8'
}
# API请求的默认参数
DEFAULT_PARAMS = {
'sex': '2', # 性别类型
'package': 'com.xxx', # 应用包名
'device_id': 'xxx', # 设备ID
'sign': 'xxx', # 签名
'uuid': 'dddddd-c4c8-5aba-ffff-ffffef05ac4a1', # 设备UUID
'token': 'd36cd8735e1a12345656919fb66', # 用户令牌
'verid': '1', # 版本ID
'system_version': 'Android 14', # 系统版本
'agentname': 'xxx', # 代理名称
'appid': 'xxx', # 应用ID
'imei': '123124142124', # IMEI号
'from': '1', # 来源
'userua': 'Mozilla/5.0 (Linux; Android 12; SM-S9180 Build/xxx; wv) AppleWebKit/xxx (KHTML, like Gecko) Version/4.0 Chrome/11.1.1111.1 Mobile Safari/111.11', # 用户代理
'android_id': '123124124124', # 安卓ID
'device': 'SM-1234', # 设备型号
'oaid': '', # OAID
'timestamp': '1733994424' # 时间戳
}
# 全局变量
query_counter = 0 # 查询计数器
api_call_counter = 0 # API调用计数器
existing_pks = set() # 已存在的主键集合,用于去重
start_line = 0 # 起始行号,用于断点续传
# 获取当前脚本所在目录的绝对路径
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
# 获取项目根目录(假设脚本在二阶段/脚本/目录下)
PROJECT_ROOT = os.path.dirname(os.path.dirname(SCRIPT_DIR))
# 文件路径配置
INPUT_FILE_PATH = os.path.join(PROJECT_ROOT, '二阶段/入参/chinese_only.txt')
VALID_INPUT_FILE_PATH = os.path.join(PROJECT_ROOT, '二阶段/入参/valid_chinese.txt') # 新增:有效入参文件路径
OUTPUT_FILE_PATH = os.path.join(PROJECT_ROOT, '二阶段/结果/twoSearch.csv')
ERROR_LINE_FILE_PATH = os.path.join(PROJECT_ROOT, '二阶段/全局上下文/last_error_line.txt')
TOKEN_STATUS_FILE = os.path.join(PROJECT_ROOT, '二阶段/全局上下文/token_status.csv')
# 确保必要的目录存在
os.makedirs(os.path.dirname(OUTPUT_FILE_PATH), exist_ok=True)
os.makedirs(os.path.dirname(ERROR_LINE_FILE_PATH), exist_ok=True)
def terminate_program(error_msg: str, line_number: int = None, data_to_save: List[Dict] = None):
"""
终止程序的辅助函数,在终止前保存数据
"""
global api_call_counter
# 如果有数据需要保存,先进行保存
logging.info("需要保存的数据:", data_to_save)
if data_to_save:
try:
logging.info("程序终止前尝试保存已获取的数据...")
save_batch_to_csv(data_to_save, OUTPUT_FILE_PATH)
logging.info("数据保存成功")
except Exception as save_error:
logging.error(f"终止前保存数据失败: {save_error}")
# 保存行号
if line_number is not None:
logging.error(f"程序终止于第 {line_number} 行: {error_msg}")
with open(ERROR_LINE_FILE_PATH, 'w') as f:
f.write(str(line_number))
else:
logging.error(f"程序终止: {error_msg}")
sys.exit(1)
def read_input_file(file_path=INPUT_FILE_PATH) -> List[Tuple[int, str]]:
"""
从文本文件中读取单字输入,每行一个字
Args:
file_path: 文本文件路径
Returns:
List of tuples containing (line_number, character)
"""
global start_line
# 检查文件是否存在
if not os.path.exists(file_path):
logging.error(f"输入文件不存在: {file_path}")
# 尝试在当前目录查找
current_dir_path = os.path.join(os.getcwd(), file_path)
if os.path.exists(current_dir_path):
file_path = current_dir_path
logging.info(f"在当前目录找到输入文件: {current_dir_path}")
else:
terminate_program(f"找不到输入文件,已尝试以下路径:\n1. {file_path}\n2. {current_dir_path}")
return []
chars = []
total_lines = sum(1 for _ in open(file_path, 'r', encoding='utf-8'))
try:
with open(file_path, 'r', encoding='utf-8') as f:
# 跳过前start_line-1行
for _ in range(start_line - 1):
next(f, None)
# todo 从start_line开始读取,限制只读取5行用于测试
# test_limit = 1
for i, line in enumerate(f, start=start_line):
if i % 1000 == 0:
logging.info(f'读取进度: {i}/{total_lines} ({(i/total_lines*100):.2f}%)')
character = line.strip()
if not character:
continue
chars.append((i, character))
# todo 达到测试限制后退出
# if len(chars) >= test_limit:
# logging.info(f'已达到测试限制({test_limit}行),停止读取')
# break
logging.info(f'总共读取了 {len(chars)} 个字符(从第 {start_line} 行开始)')
return chars
except Exception as e:
terminate_program(f'读取文件时出错: {str(e)}')
return []
def save_valid_input(keyword: str) -> None:
"""
将有效的入参保存到新文件中
Args:
keyword: 要保存的关键词
"""
try:
# 确保目录存在
os.makedirs(os.path.dirname(VALID_INPUT_FILE_PATH), exist_ok=True)
# 追加模式写入文件
with open(VALID_INPUT_FILE_PATH, 'a', encoding='utf-8') as f:
f.write(f"{keyword}\n")
logging.info(f'关键词 {keyword} 已保存到有效入参文件')
except Exception as e:
logging.error(f'保存有效入参时出错: {e}')
def check_api_limit(line_number: int = None, data_to_save: List[Dict] = None) -> bool:
"""
检查API调用次数是否达到限制,达到限制时尝试更换token
"""
global api_call_counter
if api_call_counter >= MAX_API_CALLS:
# 只更新调用次数和时间,不改变状态
update_token_status(DEFAULT_PARAMS['token'], None, api_call_counter)
logging.warning(f"当前token已达到API调用限制({MAX_API_CALLS}次),尝试获取新token")
# 获取新的token
new_token = get_available_token()
if new_token:
DEFAULT_PARAMS['token'] = new_token
api_call_counter = 0 # 重置计数器
logging.info(f"成功切换到新token: {new_token}")
return False
else:
logging.error("无法获取新的可用token")
return True
return False
def fetch_data_with_retry(keyword: str, query_type: str, total_queries: int, line_number: int) -> bool:
"""
带重试机制的数据获取函数
Returns:
bool: 是否需要更换token
"""
global query_counter, page_data, current_first_id, response, api_call_counter
try:
page = 1 # 当前页码
all_data = [] # 存储所有页的数据
last_first_id = None # 记录上一页第一条数据的 id
# 检查API调用次数是否超过限制
if check_api_limit(line_number, all_data):
return True
# 更新查询计数
query_counter += 1
current_count = query_counter
logging.info(f'正在处理第 {line_number} 行: {current_count}/{total_queries} - {query_type}: {keyword} - 第 {page} 页')
while True: # 循环处理所有页
try:
# 准备搜索参数
search_params = {
"keyword": keyword,
"search_type": "0",
"page": str(page)
}
# 构造完整的请求参数
params = DEFAULT_PARAMS.copy()
params['params'] = json.dumps(search_params)
params['timestamp'] = str(int(time.time()))
# 随机等待 0-1秒
time.sleep(random.uniform(2, 3))
# 发送HTTP请求
for attempt in range(MAX_RETRIES):
try:
api_call_counter += 1 # 增加API调用计数
response = requests.get(
BASE_URL,
params=params,
headers=HEADERS,
timeout=REQUEST_TIMEOUT,
verify=False,
proxies=PROXIES
)
if page > 1:
logging.info(f'正在获取第 {page} 页数据 - 关键词: {keyword}')
# 解析响应数据
data = response.json()
if data.get('code') == 200:
print(f"成功数据[调用次数:{api_call_counter}]======", response.text[:50] + "...")
else:
print(f"失败数据[调用次数:{api_call_counter}]======", response.text)
response.raise_for_status()
break # 如果请求成功,跳出重试循环
except Exception as e:
if attempt == MAX_RETRIES - 1: # 最后一次尝试失败
terminate_program(f"获取数据失败,已达到最大重试次数: {e}", line_number)
logging.warning(f"获取第 {line_number} 行 {keyword} 数据失败 (尝试 {attempt + 1}/{MAX_RETRIES}): {e}")
time.sleep(10) # 失败后等待 10 秒再重试
data = response.json()
# 检查API调用次数是否超过限制
if check_api_limit(line_number, all_data):
return True
# 处理API响应
if data.get('code') != 200:
error_msg = data.get('msg', '未知错误')
if '账号已触发风控' in error_msg:
# 更新token状态为不可用
update_token_status(DEFAULT_PARAMS['token'], 0, api_call_counter)
logging.error(f"API返回错误: {error_msg},尝试更换token")
return True
# 检查是否还有数据
page_data = data.get('data', [])
if not page_data:
if page == 1:
logging.info(f'关键词 {keyword} 没有搜索结果')
else:
logging.info(f'关键词 {keyword} 已到达最后一页: {page-1}')
break
# 如果是第一页且有数据,保存该入参
if page == 1:
save_valid_input(keyword)
# 检查当前页第一条数据的 id 是否与上一页相同
current_first_id = page_data[0].get('id')
if current_first_id == last_first_id:
logging.info(f'关键词 {keyword} 检测到重复数据,停止获取更多页')
break
# 更新上一页第一条数据的 id
last_first_id = current_first_id
# 处理返回的数据,添加额外信息
for item in page_data:
item['original_keyword'] = keyword # 原始关键词
item['line_number'] = line_number # 行号(保持原始行号)
item['page'] = page # 页码
all_data.extend(page_data)
page += 1 # 准备获取下一页
except Exception as e:
# 如果有已获取的数据,在终止前保存
if all_data:
terminate_program(f"处理数据时出错: {e}", line_number, all_data)
else:
terminate_program(f"处理数据时出错: {e}", line_number)
# 如果有数据就保存
if all_data:
save_batch_to_csv(all_data, OUTPUT_FILE_PATH)
return False
except Exception as e:
logging.error(f"处理关键词 {keyword} 时出错: {e}")
return False
def save_batch_to_csv(data_batch: List[Dict], filepath: str):
"""
批量保存数据到CSV,包含去重逻辑
Args:
data_batch: 要保存的数据批次
filepath: CSV文件路径
"""
global existing_pks, existing_df
if not data_batch:
return
try:
# 创建数据目录
os.makedirs(os.path.dirname(filepath), exist_ok=True)
# 将新数据转换为DataFrame
new_df = pd.DataFrame(data_batch)
# 如果文件已存在,读取现有数据
if os.path.exists(filepath):
existing_df = pd.read_csv(filepath, encoding='utf-8-sig')
if 'id' in existing_df.columns:
existing_pks.update(existing_df['id'].tolist())
if 'id' in new_df.columns:
# 批次内去重
new_df = new_df.drop_duplicates(subset=['id'], keep='first')
logging.info(f"批次内去重完成,剩余数据量: {len(new_df)}")
# 与已存在的id去重
new_df = new_df[~new_df['id'].isin(existing_pks)]
logging.info(f"与已存在id去重完成,剩余数据量: {len(new_df)}")
# 剩余数据量为0,则不保存
if len(new_df) == 0:
logging.info(f"剩余数据量为0,不保存")
return
# 更新已存在的id集合
existing_pks.update(new_df['id'].tolist())
# 如果文件存在,追加数据;否则创建新文件
if os.path.exists(filepath):
# 追加模式,不写入表头
new_df.to_csv(filepath, mode='a', header=False, index=False, encoding='utf-8-sig')
# 读取完整文件以获取总记录数
total_records = len(pd.read_csv(filepath, encoding='utf-8-sig'))
else:
# 新文件,写入表头
new_df.to_csv(filepath, index=False, encoding='utf-8-sig')
total_records = len(new_df)
logging.info(f"批量数据已追加到 {filepath}, 当前共有 {total_records} 条记录")
except Exception as e:
# 保存为JSON作为备份
backup_file = filepath.replace('.csv', f'_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json')
try:
with open(backup_file, 'w', encoding='utf-8') as f:
json.dump(data_batch, f, ensure_ascii=False, indent=2)
logging.info(f"数据已备份为JSON: {backup_file}")
except Exception as backup_error:
logging.error(f"保存数据和备份都失败了: 原始错误: {e}, 备份错误: {backup_error}")
logging.error(f"保存到CSV失败: {e}")
def main():
"""
主函数
"""
global start_line, api_call_counter
try:
# 读取上次的错误行号
if os.path.exists(ERROR_LINE_FILE_PATH):
try:
with open(ERROR_LINE_FILE_PATH, 'r') as f:
start_line = int(f.read().strip())
logging.info(f"从上次错误的第 {start_line} 行继续执行")
except Exception as e:
logging.warning(f"读取上次错误行号失败: {e}")
# 从token_status获取当前token的daily_calls
if os.path.exists(TOKEN_STATUS_FILE):
df = pd.read_csv(TOKEN_STATUS_FILE)
token_info = df[df['token'] == DEFAULT_PARAMS['token']].iloc[0]
api_call_counter = token_info['daily_calls']
logging.info(f"当前token已调用次数: {api_call_counter}")
start_time = time.time()
# 读取输入文件
qa_pairs = read_input_file()
if not qa_pairs:
terminate_program("没有读入参")
return
total = len(qa_pairs)
logging.info(f'开始处理 {total} 个入参, 入参: ===== {qa_pairs} =====,(总计 {total} 次查询)')
# 顺序处理每个关键词
for i, qa_pair in enumerate(qa_pairs):
line_number, question = qa_pair
try:
# 确保有可用token
if DEFAULT_PARAMS['token'] is None:
current_token = get_available_token()
if current_token is None:
logging.error("无法获取可用token,程序终止")
break
DEFAULT_PARAMS['token'] = current_token
api_call_counter = 0
logging.info(f"使用新token: {current_token}")
# 处理当前关键词
need_new_token = fetch_data_with_retry(question, "问题", total, line_number)
if need_new_token:
current_token = get_available_token()
if current_token:
DEFAULT_PARAMS['token'] = current_token
api_call_counter = 0
logging.info(f"切换到新token: {current_token}")
else:
logging.error("无法获取新token,程序终止")
break
except Exception as e:
logging.error(f"处理关键词 {question} 时出错: {e}")
continue
# 程序正常结束,更新最后使用的token状态
if DEFAULT_PARAMS['token']:
update_token_status(DEFAULT_PARAMS['token'], None, api_call_counter)
logging.info("所有入参处理完成!")
except Exception as e:
logging.error(f"程序异常: {e}")
if __name__ == "__main__":
main()
token管理脚本
import json
import logging
import os
import sys
import time
import warnings
from datetime import datetime
import requests
import pandas as pd
import argparse
import random
# 禁用所有警告
warnings.filterwarnings('ignore')
# 配置日志格式和处理器
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
)
# 获取当前脚本所在目录的绝对路径
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
PROJECT_ROOT = os.path.dirname(os.path.dirname(SCRIPT_DIR))
TOKEN_STATUS_FILE = os.path.join(PROJECT_ROOT, '二阶段/全局上下文/token_status.csv')
# API配置
BASE_URL = "https://www.123456.com/index.php"
PROXIES = {
'http': 'http://127.0.0.1:7890',
'https': 'http://127.0.0.1:7890'
}
MAX_API_CALLS = 790 # API调用次数上限
def register_new_account():
"""
注册新账号获取token
包含:
1. 获取手机号
2. 发送验证码
3. 接收验证码
4. 注册
5. 保存token
Returns:
str: 新注册账号的token
"""
try:
import requests
import time
import json
import hashlib
import random
# 1. 登录接码平台获取token
sms_platform_user = "123"
sms_platform_pass = "123"
sms_login_url = f"https://www.123.cn/sms/?api=login&user={sms_platform_user}&pass={sms_platform_pass}"
response = requests.get(sms_login_url)
sms_platform_data = response.json()
if sms_platform_data["code"] != 0:
raise Exception(f"接码平台登录失败: {sms_platform_data['msg']}")
sms_token = sms_platform_data["token"]
logging.info(f"接码平台登录成功, token: {sms_token}")
max_retries = 5 # 最大重试次数
for retry in range(max_retries):
try:
# 2. 获取手机号
project_id = "1234556"
get_phone_url = f"https://www.123.cn/sms/?api=getPhone&token={sms_token}&sid={project_id}"
response = requests.get(get_phone_url)
phone_data = response.json()
if phone_data["code"] != '0':
raise Exception(f"获取手机号失败: {phone_data['msg']}")
phone_number = phone_data["phone"]
logging.info(f"获取手机号成功, 手机号: {phone_number}")
# 3. 发送验证码
device_id = f"952043fe6ce7b84{random.randint(0, 9)}"
params = {
"mobile": phone_number,
"smstype": "6",
"ticket": "",
"randStr": ""
}
params_str = json.dumps(params)
# 生成sign (这里需要根据实际签名算法修改)
sign_str = f"params={params_str}×tamp={int(time.time())}&key=your_secret_key"
sign = hashlib.md5(sign_str.encode()).hexdigest().upper()
send_sms_url = "https://www.123.com/index.php/sms/send"
send_sms_params = {
"其余参数": "xxx",
"timestamp": int(time.time())
}
headers = {
"Accept": "*/*",
"Accept-Encoding": "gzip",
"Connection": "keep-alive",
"User-Agent": "okhttp-okgo/jeasonlzy",
"accept-language": "zh-CN,zh;q=0.8"
}
response = requests.get(send_sms_url, params=send_sms_params, headers=headers)
if response.status_code != 200:
raise Exception("发送验证码失败")
logging.info("发送验证码成功,等待5秒后获取验证码")
# 4. 获取验证码
start_time = time.time()
sms_code = None
time.sleep(5) # 等待5秒
logging.info("开始获取验证码")
while time.time() - start_time < 60: # 等待1分钟
elapsed_time = int(time.time() - start_time)
logging.info(f"等待验证码中...已等待{elapsed_time}秒")
get_sms_url = f"https://www.123.cn/sms/?api=getMessage&token={sms_token}&sid={project_id}&phone={phone_number}"
response = requests.get(get_sms_url)
sms_data = response.json()
logging.info(f"获取验证码响应: {json.dumps(sms_data)}")
if sms_data["code"] == "0" and sms_data.get("yzm"):
sms_code = sms_data["yzm"]
logging.info(f"获取验证码成功: {sms_code}")
break
time.sleep(5) # 每5秒检查一次
if not sms_code:
if retry < max_retries - 1:
logging.warning(f"1分钟内未收到验证码,尝试重新获取手机号,当前重试次数: {retry + 1}")
continue
else:
raise Exception("多次尝试后仍未收到验证码")
# 5. 注册获取token
register_params = {
"mobile": phone_number,
"smscode": sms_code,
"smstype": 6,
"oauthType": 1
}
register_params_str = json.dumps(register_params)
# 生成注册sign
register_sign_str = f"params={register_params_str}×tamp={int(time.time())}&key=your_secret_key"
register_sign = hashlib.md5(register_sign_str.encode()).hexdigest().upper()
register_url = "https://www.123.com/index.php/login"
register_params = {
"其余参数": "xxx",
"timestamp": int(time.time())
}
response = requests.get(register_url, params=register_params, headers=headers)
register_data = response.json()
logging.info(f"注册获取token响应: {json.dumps(register_data)}")
if response.status_code != 200:
raise Exception("注册失败")
# register_data.data.token
new_token = register_data.get("data", {}).get("token")
if not new_token:
raise Exception("获取token失败")
# 6. 保存token状态
update_token_status(new_token, 1, 0)
# 7. 维护token
maintain_token(new_token)
return new_token
except Exception as e:
if retry < max_retries - 1:
logging.warning(f"注册失败,准备重试,当前重试次数: {retry + 1}, 错误: {str(e)}")
continue
else:
raise e
except Exception as e:
logging.error(f"注册新账号失败: {e}")
return None
def maintain_token(token: str):
"""
维护已有token的状态
包含:
5. 签到一次
6. 增加积分(120 次)
7. 兑换一天 svip
Args:
token: 需要维护的token
Returns:
bool: 维护是否成功
"""
try:
# 基础请求参数
base_params = {
"其余参数": "xxx",
'token': token,
}
headers = {
'Accept': '*/*',
'Accept-Encoding': 'gzip',
'Connection': 'keep-alive',
'User-Agent': 'okhttp-okgo/jeasonlzy',
'accept-language': 'zh-CN,zh;q=0.8'
}
# 1. 签到
sign_url = 'https://www.123.com/index.php/Sign'
sign_params = base_params.copy()
sign_params.update({
'sign': '123456',
'params': '{}',
'timestamp': str(int(time.time()))
})
response = requests.get(
sign_url,
params=sign_params,
headers=headers,
proxies=PROXIES,
verify=False
)
if response.status_code != 200 or response.json().get('code') != 200:
logging.error(f"签到失败: {response.text}")
return False
logging.info("签到成功")
# 2. 领取积分(循环100次)
score_url = 'https://www.123.com/index.php/getScore'
score_params = base_params.copy()
score_params.update({
'sign': '123',
'params': '{}',
})
for i in range(200):
score_params['timestamp'] = str(int(time.time()))
response = requests.get(
score_url,
params=score_params,
headers=headers,
proxies=PROXIES,
verify=False
)
if response.status_code != 200 or response.json().get('code') != 200:
logging.error(f"第{i+1}次领取积分失败: {response.text}")
continue
if i % 10 == 0:
logging.info(f"已完成 {i+1}/200 次积分领取")
# time.sleep(random.uniform(0.5, 1)) # 随机等待0.5-1秒
# 3. 兑换VIP
vip_url = 'https://www.123.com/index.php/exchange'
vip_params = base_params.copy()
vip_params.update({
'sign': '123',
'params': '{"vip_id":21}',
'timestamp': str(int(time.time()))
})
response = requests.get(
vip_url,
params=vip_params,
headers=headers,
proxies=PROXIES,
verify=False
)
if response.status_code != 200 or response.json().get('code') != 200:
logging.error(f"兑换VIP失败: {response.text}")
return False
logging.info("兑换VIP成功")
# 所有操作成功,只更新当日调用次数和时间
update_token_status(token, None, 0) # status=None 表示不改变状态
return True
except Exception as e:
logging.error(f"维护token失败: {e}")
return False
def update_token_status(token: str, status: int = None, api_calls: int = None):
"""
更新token的状态
Args:
token: token字符串
status: 状态(0 - 禁用, 1 - 可用, None - 不改变状态)
api_calls: API调用次数
"""
try:
# 确保目录存在
os.makedirs(os.path.dirname(TOKEN_STATUS_FILE), exist_ok=True)
# 读取或创建状态文件
if os.path.exists(TOKEN_STATUS_FILE):
df = pd.read_csv(TOKEN_STATUS_FILE)
else:
df = pd.DataFrame(columns=['token', 'status', 'api_calls', 'daily_calls', 'create_time', 'update_time'])
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
new_record = {
'token': token,
'status': status,
'api_calls': api_calls if api_calls is not None else 0,
'daily_calls': api_calls if api_calls is not None else 0, # 新token或新的一天,daily_calls等于api_calls
'create_time': current_time if token not in df['token'].values else df[df['token'] == token]['create_time'].iloc[0],
'update_time': current_time
}
if token in df['token'].values:
mask = df['token'] == token
old_update_time = pd.to_datetime(df.loc[mask, 'update_time'].iloc[0])
# 如果是同一天的更新,保留当日调用次数
if old_update_time.strftime('%Y-%m-%d') == datetime.now().strftime('%Y-%m-%d'):
new_record['daily_calls'] = df.loc[mask, 'daily_calls'].iloc[0] + (api_calls or 0)
# 如果status为None,保持原状态
if status is None:
new_record['status'] = df.loc[mask, 'status'].iloc[0]
for key, value in new_record.items():
if key != 'create_time': # 不更新create_time
df.loc[mask, key] = value
else:
# 新token默认状态为1(如果没有指定状态)
if status is None:
new_record['status'] = 1
df = pd.concat([df, pd.DataFrame([new_record])], ignore_index=True)
# 保存到文件
df.to_csv(TOKEN_STATUS_FILE, index=False)
status_text = f", 状态: {status}" if status is not None else ""
logging.info(f"Token状态已更新 - Token: {token}{status_text}, API调用次数: {api_calls}")
except Exception as e:
logging.error(f"更新Token状态失败: {e}")
def get_available_token():
"""
从token状态文件中获取可用的token,如果token需要维护则进行维护
按照文件中的顺序从上到下查找第一个可用的token
Returns:
str: 可用的token,如果没有则返回None
"""
try:
if os.path.exists(TOKEN_STATUS_FILE):
df = pd.read_csv(TOKEN_STATUS_FILE)
today = datetime.now().strftime('%Y-%m-%d')
# 筛选出状态为可用的token
available_tokens = df[df['status'] == 1]
if not available_tokens.empty:
# 遍历所有可用token(保持原始顺序)
for _, token_row in available_tokens.iterrows():
token_update_time = pd.to_datetime(token_row['update_time']).strftime('%Y-%m-%d')
# 如果是今天更新过的token且未达到调用限制,直接使用
if token_update_time == today:
if token_row['daily_calls'] < MAX_API_CALLS:
return token_row['token']
continue
# 如果不是今天更新的,尝试维护
token = token_row['token']
logging.info(f"Token {token} 需要维护...")
if maintain_token(token):
logging.info("Token维护成功")
return token
logging.error("Token维护失败,尝试下一个token")
return None
except Exception as e:
logging.error(f"获取可用token失败: {e}")
return None
if __name__ == "__main__":
method = 4
if method == 1:
# # 列出所有token及其状态
if os.path.exists(TOKEN_STATUS_FILE):
df = pd.read_csv(TOKEN_STATUS_FILE)
print("\nToken状态列表:")
print("=" * 80)
for _, row in df.iterrows():
status_text = "可用" if row['status'] == 1 else "禁用"
print(f"Token: {row['token']}")
print(f"状态: {status_text}")
print(f"API调用次数: {row['api_calls']}")
print(f"今日调用次数: {row['daily_calls']}")
print(f"创建时间: {row['create_time']}")
print(f"更新时间: {row['update_time']}")
print("-" * 80)
elif method == 2:
# 检查可用token
token = get_available_token()
if token:
logging.info(f"找到可用token: {token}")
sys.exit(0)
else:
logging.error("没有可用的token")
elif method == 3:
# 维护指定token
token = "123456"
if not token:
logging.error("维护操作需要指定token参数")
sys.exit(1)
if maintain_token(token):
logging.info(f"成功维护token: {token}")
sys.exit(0)
else:
logging.error(f"维护token失败: {token}")
elif method == 4:
# 注册新token
token = register_new_account()
if token:
logging.info(f"成功注册新token: {token}")
sys.exit(0)
else:
logging.error("注册新token失败")