进阶版 -- 某恋爱话术 app 的爬虫经历与思考(含脚本)

背景

承接前文,由于上一个app 爬出来的数据只有 1w 多条,感觉不是很过瘾

所以这次又找到了一个非破解版 app,数据量大概有 40w,安全等级直线上升

声明

本次爬虫是学习实践行为,获取到的数据均已在 24 小时内全部删除,请大家不要轻易模仿,维护网络生态,共建美好生活

抓包分析

首先通过 Reqable 和 mumu 安卓模拟器进行抓包分析

得到如下接口:
QQ_1734416545937

通过接口分析可知,该 app 有登录注册,并且搜索接口有分页功能,还对 sex 参数进行了区分,从而查出不同接口

这里主要对 search 接口进行 爬取

难点

浅尝辄止后发现了以下难点:

难点 1:搜索接口有次数限制,普通用户一天只能搜索 6 次,svip 无限次

数次尝试后,找到了无限获取积分的漏洞,可以靠积分进行svip 的兑换

难点 2:就算有 svip 了,还会有风控机制,如果一个用户 token 就行搜索接口访问,太频繁了会封号(0-2 秒一次的访问频率)

这里其实也可以用线程池解决,不用线程池携带不同 token,不过我还是采用了单线程,保证对方服务器的稳定运行,不给对方造成任何经济上的损失

难点 3:经多次验证后发现,单个 token 日访问次数达到 730 次后,会被封号

难点 4:有分页机制,且 10 只会展示 10 页,

有时候 11 页会重复展示 10页的内容,这种情况会有无限页;

有时候 11 页数据为空

需要考虑到这两种情况

解决方案

迭代 1:还是用单字作为入参,5000 多行入参去访问,拿到数据后进行去重等处理

迭代 2:增加 740 次调用限制,防止 token 被封号(经测试后发现,第二天还可以重复利用)

迭代 3:一步步实验后,手上的 token 多了起来,于是创建了个 csv 文本,写了个 token 管理脚本,实现了 token 自动化

迭代 4:越来越懒,接入了接码平台,如果没有 token 了,可以自动注册一个,自此,脚本实现全自动化

脚本

Search 脚本

import json
import logging
import os
import random
import sys
import time
import warnings
from datetime import datetime
from typing import List, Dict, Tuple
import subprocess

import pandas as pd
import requests
from token_manager import get_available_token, update_token_status

# 禁用所有警告
warnings.filterwarnings('ignore')

# 配置日志格式和处理器
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] [%(threadName)s] %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout)
    ]
)

# 配置参数
BASE_URL = 'https://www.123456.com/index.php/search'  # API接口地址
MAX_WORKERS = 1      # 并发线程数
BATCH_SIZE = 10     # 批量保存数据的大小
MAX_RETRIES = 3      # 请求失败时的最大重试次数
REQUEST_TIMEOUT = 10  # 请求超时时间(秒)
MAX_API_CALLS = 750    # API调用次数上限

# 添加代理配置
PROXIES = {
    'http': 'http://127.0.0.1:7890',
    'https': 'http://127.0.0.1:7890'
}

# HTTP请求头配置
HEADERS = {
    'Accept': '*/*',
    'Accept-Encoding': 'gzip',
    'Connection': 'keep-alive',
    'User-Agent': 'okhttp-okgo/jeasonlzy',
    'accept-language': 'zh-CN,zh;q=0.8'
}

# API请求的默认参数
DEFAULT_PARAMS = {
    'sex': '2',                 # 性别类型
    'package': 'com.xxx', # 应用包名
    'device_id': 'xxx',  # 设备ID
    'sign': 'xxx',  # 签名
    'uuid': 'dddddd-c4c8-5aba-ffff-ffffef05ac4a1',  # 设备UUID
    'token': 'd36cd8735e1a12345656919fb66',      # 用户令牌
    'verid': '1',                   # 版本ID
    'system_version': 'Android 14',   # 系统版本
    'agentname': 'xxx',          # 代理名称
    'appid': 'xxx',           # 应用ID
    'imei': '123124142124',       # IMEI号
    'from': '1',                     # 来源
    'userua': 'Mozilla/5.0 (Linux; Android 12; SM-S9180 Build/xxx; wv) AppleWebKit/xxx (KHTML, like Gecko) Version/4.0 Chrome/11.1.1111.1 Mobile Safari/111.11',  # 用户代理
    'android_id': '123124124124', # 安卓ID
    'device': 'SM-1234',            # 设备型号
    'oaid': '',                      # OAID
    'timestamp': '1733994424'        # 时间戳
}

# 全局变量
query_counter = 0       # 查询计数器
api_call_counter = 0    # API调用计数器
existing_pks = set()    # 已存在的主键集合,用于去重
start_line = 0         # 起始行号,用于断点续传

# 获取当前脚本所在目录的绝对路径
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
# 获取项目根目录(假设脚本在二阶段/脚本/目录下)
PROJECT_ROOT = os.path.dirname(os.path.dirname(SCRIPT_DIR))

# 文件路径配置
INPUT_FILE_PATH = os.path.join(PROJECT_ROOT, '二阶段/入参/chinese_only.txt')
VALID_INPUT_FILE_PATH = os.path.join(PROJECT_ROOT, '二阶段/入参/valid_chinese.txt')  # 新增:有效入参文件路径
OUTPUT_FILE_PATH = os.path.join(PROJECT_ROOT, '二阶段/结果/twoSearch.csv')
ERROR_LINE_FILE_PATH = os.path.join(PROJECT_ROOT, '二阶段/全局上下文/last_error_line.txt')
TOKEN_STATUS_FILE = os.path.join(PROJECT_ROOT, '二阶段/全局上下文/token_status.csv')

# 确保必要的目录存在
os.makedirs(os.path.dirname(OUTPUT_FILE_PATH), exist_ok=True)
os.makedirs(os.path.dirname(ERROR_LINE_FILE_PATH), exist_ok=True)

def terminate_program(error_msg: str, line_number: int = None, data_to_save: List[Dict] = None):
    """
    终止程序的辅助函数,在终止前保存数据
    """
    global api_call_counter
    
    # 如果有数据需要保存,先进行保存
    logging.info("需要保存的数据:", data_to_save)
    if data_to_save:
        try:
            logging.info("程序终止前尝试保存已获取的数据...")
            save_batch_to_csv(data_to_save, OUTPUT_FILE_PATH)
            logging.info("数据保存成功")
        except Exception as save_error:
            logging.error(f"终止前保存数据失败: {save_error}")

    # 保存行号
    if line_number is not None:
        logging.error(f"程序终止于第 {line_number} 行: {error_msg}")
        with open(ERROR_LINE_FILE_PATH, 'w') as f:
            f.write(str(line_number))
    else:
        logging.error(f"程序终止: {error_msg}")
    
    sys.exit(1)

def read_input_file(file_path=INPUT_FILE_PATH) -> List[Tuple[int, str]]:
    """
    从文本文件中读取单字输入,每行一个字
    Args:
        file_path: 文本文件路径
    Returns:
        List of tuples containing (line_number, character)
    """
    global start_line
    
    # 检查文件是否存在
    if not os.path.exists(file_path):
        logging.error(f"输入文件不存在: {file_path}")
        # 尝试在当前目录查找
        current_dir_path = os.path.join(os.getcwd(), file_path)
        if os.path.exists(current_dir_path):
            file_path = current_dir_path
            logging.info(f"在当前目录找到输入文件: {current_dir_path}")
        else:
            terminate_program(f"找不到输入文件,已尝试以下路径:\n1. {file_path}\n2. {current_dir_path}")
            return []

    chars = []
    total_lines = sum(1 for _ in open(file_path, 'r', encoding='utf-8'))
    
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            # 跳过前start_line-1行
            for _ in range(start_line - 1):
                next(f, None)
            
            # todo 从start_line开始读取,限制只读取5行用于测试
            # test_limit = 1
            for i, line in enumerate(f, start=start_line):
                if i % 1000 == 0:
                    logging.info(f'读取进度: {i}/{total_lines} ({(i/total_lines*100):.2f}%)')
                
                character = line.strip()
                if not character:
                    continue
                
                chars.append((i, character))
                
                # todo 达到测试限制后退出
                # if len(chars) >= test_limit:
                #     logging.info(f'已达到测试限制({test_limit}行),停止读取')
                #     break
                
        logging.info(f'总共读取了 {len(chars)} 个字符(从第 {start_line} 行开始)')
        return chars
    except Exception as e:
        terminate_program(f'读取文件时出错: {str(e)}')
        return []

def save_valid_input(keyword: str) -> None:
    """
    将有效的入参保存到新文件中
    
    Args:
        keyword: 要保存的关键词
    """
    try:
        # 确保目录存在
        os.makedirs(os.path.dirname(VALID_INPUT_FILE_PATH), exist_ok=True)
        
        # 追加模式写入文件
        with open(VALID_INPUT_FILE_PATH, 'a', encoding='utf-8') as f:
            f.write(f"{keyword}\n")
        logging.info(f'关键词 {keyword} 已保存到有效入参文件')
    except Exception as e:
        logging.error(f'保存有效入参时出错: {e}')

def check_api_limit(line_number: int = None, data_to_save: List[Dict] = None) -> bool:
    """
    检查API调用次数是否达到限制,达到限制时尝试更换token
    """
    global api_call_counter
    
    if api_call_counter >= MAX_API_CALLS:
        # 只更新调用次数和时间,不改变状态
        update_token_status(DEFAULT_PARAMS['token'], None, api_call_counter)
        logging.warning(f"当前token已达到API调用限制({MAX_API_CALLS}次),尝试获取新token")
        
        # 获取新的token
        new_token = get_available_token()
        if new_token:
            DEFAULT_PARAMS['token'] = new_token
            api_call_counter = 0  # 重置计数器
            logging.info(f"成功切换到新token: {new_token}")
            return False
        else:
            logging.error("无法获取新的可用token")
            return True
    return False

def fetch_data_with_retry(keyword: str, query_type: str, total_queries: int, line_number: int) -> bool:
    """
    带重试机制的数据获取函数
    Returns:
        bool: 是否需要更换token
    """
    global query_counter, page_data, current_first_id, response, api_call_counter
    
    try:
        page = 1           # 当前页码
        all_data = []      # 存储所有页的数据
        last_first_id = None  # 记录上一页第一条数据的 id
        
        # 检查API调用次数是否超过限制
        if check_api_limit(line_number, all_data):
            return True
            
        # 更新查询计数
        query_counter += 1
        current_count = query_counter
        
        logging.info(f'正在处理第 {line_number} 行: {current_count}/{total_queries} - {query_type}: {keyword} - 第 {page} 页')
        
        while True:  # 循环处理所有页
            try:
                # 准备搜索参数
                search_params = {
                    "keyword": keyword,
                    "search_type": "0",
                    "page": str(page)
                }
                
                # 构造完整的请求参数
                params = DEFAULT_PARAMS.copy()
                params['params'] = json.dumps(search_params)
                params['timestamp'] = str(int(time.time()))
                
                # 随机等待  0-1秒
                time.sleep(random.uniform(2, 3))
                
                # 发送HTTP请求
                for attempt in range(MAX_RETRIES):
                    try:
                        api_call_counter += 1  # 增加API调用计数
                        response = requests.get(
                            BASE_URL,
                            params=params,
                            headers=HEADERS,
                            timeout=REQUEST_TIMEOUT,
                            verify=False,
                            proxies=PROXIES
                        )
                        
                        if page > 1:
                            logging.info(f'正在获取第 {page} 页数据 - 关键词: {keyword}')
                        
                        # 解析响应数据
                        data = response.json()
                        if data.get('code') == 200:
                            print(f"成功数据[调用次数:{api_call_counter}]======", response.text[:50] + "...")
                        else:
                            print(f"失败数据[调用次数:{api_call_counter}]======", response.text)
                        response.raise_for_status()
                        break  # 如果请求成功,跳出重试循环
                        
                    except Exception as e:
                        if attempt == MAX_RETRIES - 1:  # 最后一次尝试失败
                            terminate_program(f"获取数据失败,已达到最大重试次数: {e}", line_number)
                        logging.warning(f"获取第 {line_number}{keyword} 数据失败 (尝试 {attempt + 1}/{MAX_RETRIES}): {e}")
                        time.sleep(10)  # 失败后等待 10 秒再重试
                
                data = response.json()
                
                # 检查API调用次数是否超过限制
                if check_api_limit(line_number, all_data):
                    return True
                
                # 处理API响应
                if data.get('code') != 200:
                    error_msg = data.get('msg', '未知错误')
                    if '账号已触发风控' in error_msg:
                        # 更新token状态为不可用
                        update_token_status(DEFAULT_PARAMS['token'], 0, api_call_counter)
                        logging.error(f"API返回错误: {error_msg},尝试更换token")
                        return True
                
                # 检查是否还有数据
                page_data = data.get('data', [])
                if not page_data:
                    if page == 1:
                        logging.info(f'关键词 {keyword} 没有搜索结果')
                    else:
                        logging.info(f'关键词 {keyword} 已到达最后一页: {page-1}')
                    break
                
                # 如果是第一页且有数据,保存该入参
                if page == 1:
                    save_valid_input(keyword)
                
                # 检查当前页第一条数据的 id 是否与上一页相同
                current_first_id = page_data[0].get('id')
                if current_first_id == last_first_id:
                    logging.info(f'关键词 {keyword} 检测到重复数据,停止获取更多页')
                    break
                
                # 更新上一页第一条数据的 id
                last_first_id = current_first_id
                
                # 处理返回的数据,添加额外信息
                for item in page_data:
                    item['original_keyword'] = keyword  # 原始关键词
                    item['line_number'] = line_number  # 行号(保持原始行号)
                    item['page'] = page               # 页码
                
                all_data.extend(page_data)
                page += 1  # 准备获取下一页
                
            except Exception as e:
                # 如果有已获取的数据,在终止前保存
                if all_data:
                    terminate_program(f"处理数据时出错: {e}", line_number, all_data)
                else:
                    terminate_program(f"处理数据时出错: {e}", line_number)
        
        # 如果有数据就保存
        if all_data:
            save_batch_to_csv(all_data, OUTPUT_FILE_PATH)
        
        return False
    except Exception as e:
        logging.error(f"处理关键词 {keyword} 时出错: {e}")
        return False

def save_batch_to_csv(data_batch: List[Dict], filepath: str):
    """
    批量保存数据到CSV,包含去重逻辑
    
    Args:
        data_batch: 要保存的数据批次
        filepath: CSV文件路径
    """
    global existing_pks, existing_df

    if not data_batch:
        return
        
    try:
        # 创建数据目录
        os.makedirs(os.path.dirname(filepath), exist_ok=True)
        
        # 将新数据转换为DataFrame
        new_df = pd.DataFrame(data_batch)
        
        # 如果文件已存在,读取现有数据
        if os.path.exists(filepath):
            existing_df = pd.read_csv(filepath, encoding='utf-8-sig')
            if 'id' in existing_df.columns:
                existing_pks.update(existing_df['id'].tolist())
        
        if 'id' in new_df.columns:
            # 批次内去重
            new_df = new_df.drop_duplicates(subset=['id'], keep='first')
            logging.info(f"批次内去重完成,剩余数据量: {len(new_df)}")
            
            # 与已存在的id去重
            new_df = new_df[~new_df['id'].isin(existing_pks)]
            logging.info(f"与已存在id去重完成,剩余数据量: {len(new_df)}")

            # 剩余数据量为0,则不保存
            if len(new_df) == 0:
                logging.info(f"剩余数据量为0,不保存")
                return
            
            # 更新已存在的id集合
            existing_pks.update(new_df['id'].tolist())
        
        # 如果文件存在,追加数据;否则创建新文件
        if os.path.exists(filepath):
            # 追加模式,不写入表头
            new_df.to_csv(filepath, mode='a', header=False, index=False, encoding='utf-8-sig')
            # 读取完整文件以获取总记录数
            total_records = len(pd.read_csv(filepath, encoding='utf-8-sig'))
        else:
            # 新文件,写入表头
            new_df.to_csv(filepath, index=False, encoding='utf-8-sig')
            total_records = len(new_df)
        
        logging.info(f"批量数据已追加到 {filepath}, 当前共有 {total_records} 条记录")
        
    except Exception as e:
        # 保存为JSON作为备份
        backup_file = filepath.replace('.csv', f'_backup_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json')
        try:
            with open(backup_file, 'w', encoding='utf-8') as f:
                json.dump(data_batch, f, ensure_ascii=False, indent=2)
            logging.info(f"数据已备份为JSON: {backup_file}")
        except Exception as backup_error:
            logging.error(f"保存数据和备份都失败了: 原始错误: {e}, 备份错误: {backup_error}")
        logging.error(f"保存到CSV失败: {e}")

def main():
    """
    主函数
    """
    global start_line, api_call_counter
    
    try:
        # 读取上次的错误行号
        if os.path.exists(ERROR_LINE_FILE_PATH):
            try:
                with open(ERROR_LINE_FILE_PATH, 'r') as f:
                    start_line = int(f.read().strip())
                    logging.info(f"从上次错误的第 {start_line} 行继续执行")
            except Exception as e:
                logging.warning(f"读取上次错误行号失败: {e}")
        
        # 从token_status获取当前token的daily_calls
        if os.path.exists(TOKEN_STATUS_FILE):
            df = pd.read_csv(TOKEN_STATUS_FILE)
            token_info = df[df['token'] == DEFAULT_PARAMS['token']].iloc[0]
            api_call_counter = token_info['daily_calls']
            logging.info(f"当前token已调用次数: {api_call_counter}")
        
        start_time = time.time()
        
        # 读取输入文件
        qa_pairs = read_input_file()
        if not qa_pairs:
            terminate_program("没有读入参")
            return
        
        total = len(qa_pairs)
        logging.info(f'开始处理 {total} 个入参,  入参: ===== {qa_pairs} =====,(总计 {total} 次查询)')
        
        # 顺序处理每个关键词
        for i, qa_pair in enumerate(qa_pairs):
            line_number, question = qa_pair
            try:
                # 确保有可用token
                if DEFAULT_PARAMS['token'] is None:
                    current_token = get_available_token()
                    if current_token is None:
                        logging.error("无法获取可用token,程序终止")
                        break
                    DEFAULT_PARAMS['token'] = current_token
                    api_call_counter = 0
                    logging.info(f"使用新token: {current_token}")
                
                # 处理当前关键词
                need_new_token = fetch_data_with_retry(question, "问题", total, line_number)
                if need_new_token:
                    current_token = get_available_token()
                    if current_token:
                        DEFAULT_PARAMS['token'] = current_token
                        api_call_counter = 0
                        logging.info(f"切换到新token: {current_token}")
                    else:
                        logging.error("无法获取新token,程序终止")
                        break
                        
            except Exception as e:
                logging.error(f"处理关键词 {question} 时出错: {e}")
                continue
        
        # 程序正常结束,更新最后使用的token状态
        if DEFAULT_PARAMS['token']:
            update_token_status(DEFAULT_PARAMS['token'], None, api_call_counter)
        
        logging.info("所有入参处理完成!")
        
    except Exception as e:
        logging.error(f"程序异常: {e}")

if __name__ == "__main__":
    main()

token管理脚本

import json
import logging
import os
import sys
import time
import warnings
from datetime import datetime
import requests
import pandas as pd
import argparse
import random

# 禁用所有警告
warnings.filterwarnings('ignore')

# 配置日志格式和处理器
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout)
    ]
)

# 获取当前脚本所在目录的绝对路径
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
PROJECT_ROOT = os.path.dirname(os.path.dirname(SCRIPT_DIR))
TOKEN_STATUS_FILE = os.path.join(PROJECT_ROOT, '二阶段/全局上下文/token_status.csv')

# API配置
BASE_URL = "https://www.123456.com/index.php"
PROXIES = {
    'http': 'http://127.0.0.1:7890',
    'https': 'http://127.0.0.1:7890'
}

MAX_API_CALLS = 790    # API调用次数上限

def register_new_account():
    """
    注册新账号获取token
    包含:
    1. 获取手机号
    2. 发送验证码
    3. 接收验证码
    4. 注册
    5. 保存token
    
    Returns:
        str: 新注册账号的token
    """
    try:
        import requests
        import time
        import json
        import hashlib
        import random

        # 1. 登录接码平台获取token
        sms_platform_user = "123"
        sms_platform_pass = "123"
        sms_login_url = f"https://www.123.cn/sms/?api=login&user={sms_platform_user}&pass={sms_platform_pass}"

        response = requests.get(sms_login_url)
        sms_platform_data = response.json()
        if sms_platform_data["code"] != 0:
            raise Exception(f"接码平台登录失败: {sms_platform_data['msg']}")

        sms_token = sms_platform_data["token"]
        logging.info(f"接码平台登录成功, token: {sms_token}")

        max_retries = 5  # 最大重试次数
        for retry in range(max_retries):
            try:
                # 2. 获取手机号
                project_id = "1234556"
                get_phone_url = f"https://www.123.cn/sms/?api=getPhone&token={sms_token}&sid={project_id}"
                response = requests.get(get_phone_url)
                phone_data = response.json()
                if phone_data["code"] != '0':
                    raise Exception(f"获取手机号失败: {phone_data['msg']}")

                phone_number = phone_data["phone"]
                logging.info(f"获取手机号成功, 手机号: {phone_number}")

                # 3. 发送验证码
                device_id = f"952043fe6ce7b84{random.randint(0, 9)}"
                params = {
                    "mobile": phone_number,
                    "smstype": "6",
                    "ticket": "",
                    "randStr": ""
                }
                params_str = json.dumps(params)

                # 生成sign (这里需要根据实际签名算法修改)
                sign_str = f"params={params_str}&timestamp={int(time.time())}&key=your_secret_key"
                sign = hashlib.md5(sign_str.encode()).hexdigest().upper()

                send_sms_url = "https://www.123.com/index.php/sms/send"
                send_sms_params = {
                    "其余参数": "xxx",
                    "timestamp": int(time.time())
                }

                headers = {
                    "Accept": "*/*",
                    "Accept-Encoding": "gzip",
                    "Connection": "keep-alive",
                    "User-Agent": "okhttp-okgo/jeasonlzy",
                    "accept-language": "zh-CN,zh;q=0.8"
                }

                response = requests.get(send_sms_url, params=send_sms_params, headers=headers)
                if response.status_code != 200:
                    raise Exception("发送验证码失败")
                logging.info("发送验证码成功,等待5秒后获取验证码")

                # 4. 获取验证码
                start_time = time.time()
                sms_code = None
                time.sleep(5)  # 等待5秒
                logging.info("开始获取验证码")
                while time.time() - start_time < 60:  # 等待1分钟
                    elapsed_time = int(time.time() - start_time)
                    logging.info(f"等待验证码中...已等待{elapsed_time}秒")

                    get_sms_url = f"https://www.123.cn/sms/?api=getMessage&token={sms_token}&sid={project_id}&phone={phone_number}"
                    response = requests.get(get_sms_url)
                    sms_data = response.json()
                    logging.info(f"获取验证码响应: {json.dumps(sms_data)}")

                    if sms_data["code"] == "0" and sms_data.get("yzm"):
                        sms_code = sms_data["yzm"]
                        logging.info(f"获取验证码成功: {sms_code}")
                        break
                    time.sleep(5)  # 每5秒检查一次

                if not sms_code:
                    if retry < max_retries - 1:
                        logging.warning(f"1分钟内未收到验证码,尝试重新获取手机号,当前重试次数: {retry + 1}")
                        continue
                    else:
                        raise Exception("多次尝试后仍未收到验证码")

                # 5. 注册获取token
                register_params = {
                    "mobile": phone_number,
                    "smscode": sms_code,
                    "smstype": 6,
                    "oauthType": 1
                }
                register_params_str = json.dumps(register_params)

                # 生成注册sign
                register_sign_str = f"params={register_params_str}&timestamp={int(time.time())}&key=your_secret_key"
                register_sign = hashlib.md5(register_sign_str.encode()).hexdigest().upper()

                register_url = "https://www.123.com/index.php/login"
                register_params = {
                    "其余参数": "xxx",
                    "timestamp": int(time.time())
                }

                response = requests.get(register_url, params=register_params, headers=headers)
                register_data = response.json()
                logging.info(f"注册获取token响应: {json.dumps(register_data)}")

                if response.status_code != 200:
                    raise Exception("注册失败")

                # register_data.data.token
                new_token = register_data.get("data", {}).get("token")
                if not new_token:
                    raise Exception("获取token失败")

                # 6. 保存token状态
                update_token_status(new_token, 1, 0)

                # 7. 维护token
                maintain_token(new_token)

                return new_token

            except Exception as e:
                if retry < max_retries - 1:
                    logging.warning(f"注册失败,准备重试,当前重试次数: {retry + 1}, 错误: {str(e)}")
                    continue
                else:
                    raise e

    except Exception as e:
        logging.error(f"注册新账号失败: {e}")
        return None

def maintain_token(token: str):
    """
    维护已有token的状态
    包含:
    5. 签到一次
    6. 增加积分(120 次)
    7. 兑换一天 svip
    
    Args:
        token: 需要维护的token
    
    Returns:
        bool: 维护是否成功
    """
    try:
        # 基础请求参数
        base_params = {
            "其余参数": "xxx",
            'token': token,
        }

        headers = {
            'Accept': '*/*',
            'Accept-Encoding': 'gzip',
            'Connection': 'keep-alive',
            'User-Agent': 'okhttp-okgo/jeasonlzy',
            'accept-language': 'zh-CN,zh;q=0.8'
        }

        # 1. 签到
        sign_url = 'https://www.123.com/index.php/Sign'
        sign_params = base_params.copy()
        sign_params.update({
            'sign': '123456',
            'params': '{}',
            'timestamp': str(int(time.time()))
        })
        
        response = requests.get(
            sign_url,
            params=sign_params,
            headers=headers,
            proxies=PROXIES,
            verify=False
        )
        if response.status_code != 200 or response.json().get('code') != 200:
            logging.error(f"签到失败: {response.text}")
            return False
        logging.info("签到成功")
        
        # 2. 领取积分(循环100次)
        score_url = 'https://www.123.com/index.php/getScore'
        score_params = base_params.copy()
        score_params.update({
            'sign': '123',
            'params': '{}',
        })
        
        for i in range(200):
            score_params['timestamp'] = str(int(time.time()))
            response = requests.get(
                score_url,
                params=score_params,
                headers=headers,
                proxies=PROXIES,
                verify=False
            )
            if response.status_code != 200 or response.json().get('code') != 200:
                logging.error(f"第{i+1}次领取积分失败: {response.text}")
                continue
            if i % 10 == 0:
                logging.info(f"已完成 {i+1}/200 次积分领取")
            # time.sleep(random.uniform(0.5, 1))  # 随机等待0.5-1秒
        
        # 3. 兑换VIP
        vip_url = 'https://www.123.com/index.php/exchange'
        vip_params = base_params.copy()
        vip_params.update({
            'sign': '123',
            'params': '{"vip_id":21}',
            'timestamp': str(int(time.time()))
        })
        
        response = requests.get(
            vip_url,
            params=vip_params,
            headers=headers,
            proxies=PROXIES,
            verify=False
        )
        if response.status_code != 200 or response.json().get('code') != 200:
            logging.error(f"兑换VIP失败: {response.text}")
            return False
        logging.info("兑换VIP成功")
        
        # 所有操作成功,只更新当日调用次数和时间
        update_token_status(token, None, 0)  # status=None 表示不改变状态
        return True
        
    except Exception as e:
        logging.error(f"维护token失败: {e}")
        return False

def update_token_status(token: str, status: int = None, api_calls: int = None):
    """
    更新token的状态
    
    Args:
        token: token字符串
        status: 状态(0 - 禁用, 1 - 可用, None - 不改变状态)
        api_calls: API调用次数
    """
    try:
        # 确保目录存在
        os.makedirs(os.path.dirname(TOKEN_STATUS_FILE), exist_ok=True)
        
        # 读取或创建状态文件
        if os.path.exists(TOKEN_STATUS_FILE):
            df = pd.read_csv(TOKEN_STATUS_FILE)
        else:
            df = pd.DataFrame(columns=['token', 'status', 'api_calls', 'daily_calls', 'create_time', 'update_time'])
        
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        new_record = {
            'token': token,
            'status': status,
            'api_calls': api_calls if api_calls is not None else 0,
            'daily_calls': api_calls if api_calls is not None else 0,  # 新token或新的一天,daily_calls等于api_calls
            'create_time': current_time if token not in df['token'].values else df[df['token'] == token]['create_time'].iloc[0],
            'update_time': current_time
        }
        
        if token in df['token'].values:
            mask = df['token'] == token
            old_update_time = pd.to_datetime(df.loc[mask, 'update_time'].iloc[0])
            
            # 如果是同一天的更新,保留当日调用次数
            if old_update_time.strftime('%Y-%m-%d') == datetime.now().strftime('%Y-%m-%d'):
                new_record['daily_calls'] = df.loc[mask, 'daily_calls'].iloc[0] + (api_calls or 0)
            
            # 如果status为None,保持原状态
            if status is None:
                new_record['status'] = df.loc[mask, 'status'].iloc[0]
            
            for key, value in new_record.items():
                if key != 'create_time':  # 不更新create_time
                    df.loc[mask, key] = value
        else:
            # 新token默认状态为1(如果没有指定状态)
            if status is None:
                new_record['status'] = 1
            df = pd.concat([df, pd.DataFrame([new_record])], ignore_index=True)
        
        # 保存到文件
        df.to_csv(TOKEN_STATUS_FILE, index=False)
        status_text = f", 状态: {status}" if status is not None else ""
        logging.info(f"Token状态已更新 - Token: {token}{status_text}, API调用次数: {api_calls}")
        
    except Exception as e:
        logging.error(f"更新Token状态失败: {e}")

def get_available_token():
    """
    从token状态文件中获取可用的token,如果token需要维护则进行维护
    按照文件中的顺序从上到下查找第一个可用的token
    
    Returns:
        str: 可用的token,如果没有则返回None
    """
    try:
        if os.path.exists(TOKEN_STATUS_FILE):
            df = pd.read_csv(TOKEN_STATUS_FILE)
            today = datetime.now().strftime('%Y-%m-%d')
            
            # 筛选出状态为可用的token
            available_tokens = df[df['status'] == 1]
            
            if not available_tokens.empty:
                # 遍历所有可用token(保持原始顺序)
                for _, token_row in available_tokens.iterrows():
                    token_update_time = pd.to_datetime(token_row['update_time']).strftime('%Y-%m-%d')
                    
                    # 如果是今天更新过的token且未达到调用限制,直接使用
                    if token_update_time == today:
                        if token_row['daily_calls'] < MAX_API_CALLS:
                            return token_row['token']
                        continue
                    
                    # 如果不是今天更新的,尝试维护
                    token = token_row['token']
                    logging.info(f"Token {token} 需要维护...")
                    if maintain_token(token):
                        logging.info("Token维护成功")
                        return token
                    logging.error("Token维护失败,尝试下一个token")
                
        return None
    except Exception as e:
        logging.error(f"获取可用token失败: {e}")
        return None

if __name__ == "__main__":
    method = 4
    if method == 1:
        # # 列出所有token及其状态
        if os.path.exists(TOKEN_STATUS_FILE):
            df = pd.read_csv(TOKEN_STATUS_FILE)
            print("\nToken状态列表:")
            print("=" * 80)
            for _, row in df.iterrows():
                status_text = "可用" if row['status'] == 1 else "禁用"
                print(f"Token: {row['token']}")
                print(f"状态: {status_text}")
                print(f"API调用次数: {row['api_calls']}")
                print(f"今日调用次数: {row['daily_calls']}")
                print(f"创建时间: {row['create_time']}")
                print(f"更新时间: {row['update_time']}")
                print("-" * 80)
    elif method == 2:
        # 检查可用token
        token = get_available_token()
        if token:
            logging.info(f"找到可用token: {token}")
            sys.exit(0)
        else:
            logging.error("没有可用的token")
    elif method == 3:
        # 维护指定token
        token = "123456"
        if not token:
            logging.error("维护操作需要指定token参数")
            sys.exit(1)
        if maintain_token(token):
            logging.info(f"成功维护token: {token}")
            sys.exit(0)
        else:
            logging.error(f"维护token失败: {token}")
    elif method == 4:
        # 注册新token
        token = register_new_account()
        if token:
            logging.info(f"成功注册新token: {token}")
            sys.exit(0)
        else:
            logging.error("注册新token失败")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/939767.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

深入理解 Linux 内核启动流程

目录 一、BIOS 与 Bootloader 1.BIOS&#xff08;Basic Input/Output System&#xff09; 2.Bootloader&#xff08;引导加载程序&#xff09; 二、内核初始化 1.解压内核映像 2.初始化硬件设备 3.建立内存管理系统 4.启动第一个进程&#xff08;init&#xff09; 三、…

Android笔记【19】

具体示例 run: val result someObject.run {// 这里可以使用 thisthis.someMethod() }let: val result someObject?.let {// 这里使用 itit.someMethod() }with: val result with(someObject) {// 这里使用 thissomeMethod() }apply: val obj SomeClass().apply {// 这里使…

【Qt】qt安装

在工作一年之后&#xff0c;还是想做一个Qt的教程&#xff0c;遥想研一刚刚接触Qt&#xff0c;从0到1学习&#xff0c;没有什么参考书籍&#xff0c;网上的资料也不多&#xff0c;幸好Qt官方文档写得好&#xff0c;加上自己肯研究&#xff0c;才堪堪入门。 现在我想自己写一个…

Word使用分隔符实现页面部分分栏

文章目录 Word使用分隔符实现页面部分分栏分隔符使用页面设置 Word使用分隔符实现页面部分分栏 分隔符使用 word中的分隔符&#xff1a; 前面不分栏&#xff0c;后面分栏(或前面分栏&#xff0c;后面不分栏)&#xff0c;只需要在分隔位置处插入分隔符&#xff1a;“连续”即…

搭建Tomcat(四)---Servlet容器

目录 引入 Servlet容器 一、优化MyTomcat ①先将MyTomcat的main函数搬过来&#xff1a; ②将getClass()函数搬过来 ③创建容器 ④连接ServletConfigMapping和MyTomcat 连接&#xff1a; ⑤完整的ServletConfigMapping和MyTomcat方法&#xff1a; a.ServletConfigMappin…

谁说C比C++快?

看到这个问题&#xff0c;我我得说&#xff1a;这事儿没有那么简单。 1. 先把最大的误区打破 "C永远比C快" —— 某位1990年代的程序员 这种说法就像"自行车永远比汽车省油"一样荒谬。我们来看个例子&#xff1a; // C风格 char* str (char*)malloc(100…

html <a>设置发送邮件链接、打电话链接 <a href=“mailto:></a> <a href=“tel:></a>

1.代码 <ul><li>电话&#xff1a;<a href"tel:18888888888">188-8888-8888</a></li><li>邮箱&#xff1a;<a href"mailto:10000qq.com">10000qq.com</a></li><li>邮箱&#xff1a;<a hre…

Nginx三种安装方式

Nginx安装 可以登录 Nginx 的官方网站&#xff1a;https://www.nginx.com/ 找到安装方式。 查看如何安装开源的版本&#xff1a;https://docs.nginx.com/nginx/admin-guide/installing-nginx/installing-nginx-open-source/ 通过官方的说明&#xff0c;也可以知道安装&#…

Android 10 Launcher3 删除谷歌搜索

命令行获取页面 手机处于launcher首页 adb shell dumpsys window | findstr mCurrentFocus 输出 mCurrentFocusWindow{9afb34d u0 com.android.launcher3/com.android.launcher3.Launcher} 找到源码路径 packages/apps/Launcher3/ Android10源码 搜索控件 grep -r -n Apps…

自动驾驶AVM环视算法--python版本的俯视TOP投影模式

c语言版本和算法原理的可以查看本人的其他文档。《自动驾驶AVM环视算法--全景的俯视图像和原图》本文档进用于展示部分代码的视线&#xff0c;获取方式网盘自行获取&#xff08;非免费介意勿下载&#xff09;&#xff1a;链接: https://pan.baidu.com/s/1MJa8ZCEfNyLc5x0uVegto…

前端OpenAPI根据后端Swagger自动生成前端接口报错

测试之后发现是因为Map<Long,List<CommentVO>>的返回值类型的锅&#xff0c;改成Page<List<CommentVO>>即可解决。 前端使用的umiMAX的openapi&#xff0c;报错如下&#xff1a; originalRef: BaseResponseboolean\n "401&q…

java开发入门学习五-流程控制

流程控制语句 if&#xff0c; if...else&#xff0c; if..else if..else 与前端相同 略 switch case 与前端不同的是case不能使用表达式&#xff0c;使用表达式会报错 class TestSwitch {public static void main(String[] args) {// switch 表达式只能是特定的数据类型…

.NET 技术 | 调用系统API创建Windows服务

01阅读须知 此文所提供的信息只为网络安全人员对自己所负责的网站、服务器等&#xff08;包括但不限于&#xff09;进行检测或维护参考&#xff0c;未经授权请勿利用文章中的技术资料对任何计算机系统进行入侵操作。利用此文所提供的信息而造成的直接或间接后果和损失&#xf…

《PCI密码卡技术规范》题目

单选1 在《PCI密码卡技术规范》中&#xff0c;下列哪项不属于PCI密码卡的功能&#xff08;&#xff09;。 A.密码运算功能 B.密钥管理功能 C.物理随机数产生功能 D.随主计算机可信检测功能 正确答案&#xff1a;D. <font style"color:#DF2A3F;">解析&…

Java操作Redis-Jedis

介绍 前面我们讲解了Redis的常用命令&#xff0c;这些命令是我们操作Redis的基础&#xff0c;那么我们在 java程序中应该如何操作Redis呢&#xff1f;这就需要使用Redis的Java客户端&#xff0c;就如同我们使 用JDBC操作MySQL数据库一样。 Redis 的 Java 客户端很多&#xff0…

QT网络(二):TCP通信

传输层概念 传输控制协议&#xff08;transmission control protocol&#xff0c;TCP&#xff09;是一种被大多数 Internet 网络协议用于数据传输的底层网络协议&#xff0c;它是可靠的、面向流和连接的传输协议&#xff0c;特别适合用于连续数据传输。 应用层在网络模型中的…

Cherno C++学习笔记 P43 对象生存周期

这篇文章我们讲一下对象的生存周期。这个也是一个很重要的问题&#xff0c;因为我们总说&#xff0c;编程其实就是在操纵内存&#xff0c;而知道了变量如何在栈上生存&#xff0c;以及我们如何利用这些特性来让我们的编程更加简单&#xff0c;我们才是真的理解了编程。我们都知…

02、10个富士胶片模拟的设置

二色彩 1、色彩的加减控制全局的饱和度增减&#xff1b; 2、色彩效果只提升暖色系饱和度&#xff1b; 3、FX蓝色大幅度提升蓝色系饱和度&#xff1b; 4、三个参数都不改变颜色的色相。 2.1 色彩 色彩调整的是拍摄画面整体的色彩饱和程度 2.2色彩效果 调整的是画面中暖色…

【Unity3D】ILRuntime学习记录一

Unity 2019.4.0f1 导入ILRuntime 2.1.0版本 项目目录/Packages/manifest.json添加如下代码&#xff1a; {"scopedRegistries":[{"name":"ILRuntime","url":"https://registry.npmjs.org","scopes":["com.ou…

ECharts柱状图-柱图38,附视频讲解与代码下载

引言&#xff1a; 在数据可视化的世界里&#xff0c;ECharts凭借其丰富的图表类型和强大的配置能力&#xff0c;成为了众多开发者的首选。今天&#xff0c;我将带大家一起实现一个柱状图图表&#xff0c;通过该图表我们可以直观地展示和分析数据。此外&#xff0c;我还将提供…