某东抢购某台脚本——高成功率
小白操作-仅供学习参考
说明
这段代码主要关联了许多网络请求和对应的业务逻辑处理,用于处理与一个名为“茅台商城”的应用相关的网络操作。主要功能和关键组件的详细说明如下:
- 全局变量和配置:
-
使用AES加密密钥(
AES_KEY
)、初始向量(AES_IV
)和一个盐值(SALT
)用于加密相关操作。 -
定义了当前时间戳
current_time
和一组请求头headers
。 -
从苹果应用商店拉取并解析应用的当前版本号。
- 初始化请求头:
init_headers
函数通过解析一个多行的字符串header_context
来初始化请求头。
- 签名生成:
signature
函数使用MD5算法生成请求的签名,签名用于后续的网络请求验证。
- 用户交互函数:
-
get_vcode
用于获取短信验证码。 -
login
用于用户登录,使用手机号和验证码。 -
get_current_session_id
和get_day_time
用于获取当前的会话ID和准确的时间戳。
- 地理位置和商店处理:
-
get_location_count
查询特定省市下的商店库存。 -
distance_shop
和max_shop
用于基于不同标准(如距离和库存最大化)选择商店。 -
select_geo
用于地理编码,将地址转换成地理坐标。
- 加密和动作参数构建:
-
使用
Encrypt
类(从encrypt
模块导入)对参数进行AES加密。 -
act_params
用于构建活动参数,这些参数包括了用户、商品和店铺的ID等。
- 通知和预约功能:
-
send_email
函数通过Pushplus服务发送通知。 -
reservation
用于提交商品预约请求。
- 实用功能:
-
get_map
用于获取商店的地理位置数据。 -
getUserEnergyAward
用于领取用户能量奖励,这可能是应用内的某种积分或奖励系统。
部分源码-脱敏
def send_email(msg: str):
if config.PUSH_TOKEN is None:
return
title = 'imoutai预约失败' # 改成你要的标题内容
content = msg # 改成你要的正文内容
url = 'http://www.pushplus.plus/send'
r = requests.get(url, params={'token': config.PUSH_TOKEN,
'title': title,
'content': content})
logging.info(f'通知推送结果:{r.status_code, r.text}')
def reservation(params: dict, mobile: str):
params.pop('userId')
responses = requests.post("https://app.moutai519.com.cn/xhr/front/mall/reservation/add", json=params,
headers=headers)
if responses.status_code == 401:
send_email(f'[{mobile}],登录token失效,需要重新登录')
raise RuntimeError
if '您的实名信息未完善或未通过认证' in responses.text:
send_email(f'[{mobile}],{responses.text}')
raise RuntimeError
logging.info(
f'预约 : mobile:{mobile} : response code : {responses.status_code}, response body : {responses.text}')
def select_geo(i: str):
# https://www.piliang.tech/geocoding-amap
resp = requests.get(f"https://www.piliang.tech/api/amap/geocode?address={i}")
geocodes: list = resp.json()['geocodes']
return geocodes
def get_map(lat: str = '28.499562', lng: str = '102.182324'):
p_c_map = {}
url = 'https://static.moutai519.com.cn/mt-backend/xhr/front/mall/resource/get'
headers = {
'X-Requested-With': 'XMLHttpRequest',
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 15_0_1 like Mac OS X)',
'Referer': 'https://h5.moutai519.com.cn/gux/game/main?appConfig=2_1_2',
'Client-User-Agent': 'iOS;16.0.1;Apple;iPhone 14 ProMax',
'MT-R': 'clips_OlU6TmFRag5rCXwbNAQ/Tz1SKlN8THcecBp/HGhHdw==',
'Origin': 'https://h5.moutai519.com.cn',
'MT-APP-Version': mt_version,
'MT-Request-ID': f'{int(time.time() * 1000)}{random.randint(1111111, 999999999)}{int(time.time() * 1000)}',
'Accept-Language': 'zh-CN,zh-Hans;q=1',
'MT-Device-ID': f'{int(time.time() * 1000)}{random.randint(1111111, 999999999)}{int(time.time() * 1000)}',
'Accept': 'application/json, text/javascript, */*; q=0.01',
'mt-lng': f'{lng}',
'mt-lat': f'{lat}'
}
res = requests.get(url, headers=headers, )
mtshops = res.json().get('data', {}).get('mtshops_pc', {})
urls = mtshops.get('url')
r = requests.get(urls)
for k, v in dict(r.json()).items():
provinceName = v.get('provinceName')
cityName = v.get('cityName')
if not p_c_map.get(provinceName):
p_c_map[provinceName] = {}
if not p_c_map[provinceName].get(cityName, None):
p_c_map[provinceName][cityName] = [k]
else:
p_c_map[provinceName][cityName].append(k)
return p_c_map, dict(r.json())
def getUserEnergyAward(mobile: str):
"""
"""
cookies = {
'MT-Device-ID-Wap': headers['MT-Device-ID'],
'MT-Token-Wap': headers['MT-Token'],
'YX_SUPPORT_WEBP': '1',
}
response = requests.post('https://h5.moutai519.com.cn/game/isolationPage/getUserEnergyAward', cookies=cookies,
headers=headers, json={})
# response.json().get('message') if '无法领取奖励' in response.text else "领取奖励成功"
logging.info(
f' : mobile:{mobile} : response code : {response.status_code}, response body : {response.text}')