MediaCrawler
- var.py
- recv_sms.py
- async_db.py
- db.py
- main.py
- base/base_crawler.py
- config/baseconfig.py
- config/db_config.py
- 有待更新
var.py
from asyncio.tasks import Task
from contextvars import ContextVar
from typing import List
import aiomysql
from async_db import AsyncMysqlDB
request_keyword_var: ContextVar[str] = ContextVar("request_keyword", default="")
crawler_type_var: ContextVar[str] = ContextVar("crawler_type", default="")
comment_tasks_var: ContextVar[List[Task]] = ContextVar("comment_tasks", default=[])
media_crawler_db_var: ContextVar[AsyncMysqlDB] = ContextVar("media_crawler_db_var")
db_conn_pool_var: ContextVar[aiomysql.Pool] = ContextVar("db_conn_pool_var")
recv_sms.py
import re
from typing import List
import redis
import uvicorn
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel
import config
from tools import utils
app = FastAPI()
redis_client = redis.Redis(host=config.REDIS_DB_HOST, password=config.REDIS_DB_PWD)
class SmsNotification(BaseModel):
platform: str
current_number: str
from_number: str
sms_content: str
timestamp: str
def extract_verification_code(message: str) -> str:
"""
Extract verification code of 6 digits from the SMS.
"""
pattern = re.compile(r'\b[0-9]{6}\b')
codes: List[str] = pattern.findall(message)
return codes[0] if codes else ""
@app.post("/")
def receive_sms_notification(sms: SmsNotification):
"""
Receive SMS notification and send it to Redis.
Args:
sms:
{
"platform": "xhs",
"from_number": "1069421xxx134",
"sms_content": "【小红书】您的验证码是: 171959, 3分钟内有效。请勿向他人泄漏。如非本人操作,可忽略本消息。",
"timestamp": "1686720601614",
"current_number": "13152442222"
}
Returns:
"""
utils.logger.info(f"Received SMS notification: {sms.platform}, {sms.current_number}")
sms_code = extract_verification_code(sms.sms_content)
if sms_code:
# Save the verification code in Redis and set the expiration time to 3 minutes.
key = f"{sms.platform}_{sms.current_number}"
redis_client.set(key, sms_code, ex=60 * 3)
return {"status": "ok"}
@app.get("/", status_code=status.HTTP_404_NOT_FOUND)
async def not_found():
raise HTTPException(status_code=404, detail="Not Found")
if __name__ == '__main__':
uvicorn.run(app, port=8000, host='0.0.0.0')
async_db.py
from typing import Any, Dict, List, Union
import aiomysql
class AsyncMysqlDB:
def __init__(self, pool: aiomysql.Pool) -> None:
self.__pool = pool
async def query(self, sql: str, *args: Union[str, int]) -> List[Dict[str, Any]]:
"""
从给定的 SQL 中查询记录,返回的是一个列表
:param sql: 查询的sql
:param args: sql中传递动态参数列表
:return:
"""
async with self.__pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
data = await cur.fetchall()
return data or []
async def get_first(self, sql: str, *args: Union[str, int]) -> Union[Dict[str, Any], None]:
"""
从给定的 SQL 中查询记录,返回的是符合条件的第一个结果
:param sql: 查询的sql
:param args:sql中传递动态参数列表
:return:
"""
async with self.__pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args)
data = await cur.fetchone()
return data
async def item_to_table(self, table_name: str, item: Dict[str, Any]) -> int:
"""
表中插入数据
:param table_name: 表名
:param item: 一条记录的字典信息
:return:
"""
fields = list(item.keys())
values = list(item.values())
fields = [f'`{field}`' for field in fields]
fieldstr = ','.join(fields)
valstr = ','.join(['%s'] * len(item))
sql = "INSERT INTO %s (%s) VALUES(%s)" % (table_name, fieldstr, valstr)
async with self.__pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, values)
lastrowid = cur.lastrowid
return lastrowid
async def update_table(self, table_name: str, updates: Dict[str, Any], field_where: str,
value_where: Union[str, int, float]) -> int:
"""
更新指定表的记录
:param table_name: 表名
:param updates: 需要更新的字段和值的 key - value 映射
:param field_where: update 语句 where 条件中的字段名
:param value_where: update 语句 where 条件中的字段值
:return:
"""
upsets = []
values = []
for k, v in updates.items():
s = '`%s`=%%s' % k
upsets.append(s)
values.append(v)
upsets = ','.join(upsets)
sql = 'UPDATE %s SET %s WHERE %s="%s"' % (
table_name,
upsets,
field_where, value_where,
)
async with self.__pool.acquire() as conn:
async with conn.cursor() as cur:
rows = await cur.execute(sql, values)
return rows
async def execute(self, sql: str, *args: Union[str, int]) -> int:
"""
需要更新、写入等操作的 excute 执行语句
:param sql:
:param args:
:return:
"""
async with self.__pool.acquire() as conn:
async with conn.cursor() as cur:
rows = await cur.execute(sql, args)
return rows
db.py
import asyncio
from typing import Dict
from urllib.parse import urlparse
import aiofiles
import aiomysql
import config
from async_db import AsyncMysqlDB
from tools import utils
from var import db_conn_pool_var, media_crawler_db_var
def parse_mysql_url(mysql_url) -> Dict:
"""
从配置文件中解析db链接url,给到aiomysql用,因为aiomysql不支持直接以URL的方式传递链接信息。
Args:
mysql_url: mysql://root:{RELATION_DB_PWD}@localhost:3306/media_crawler
Returns:
"""
parsed_url = urlparse(mysql_url)
db_params = {
'host': parsed_url.hostname,
'port': parsed_url.port or 3306,
'user': parsed_url.username,
'password': parsed_url.password,
'db': parsed_url.path.lstrip('/')
}
return db_params
async def init_mediacrawler_db():
"""
初始化数据库链接池对象,并将该对象塞给media_crawler_db_var上下文变量
Returns:
"""
db_conn_params = parse_mysql_url(config.RELATION_DB_URL)
pool = await aiomysql.create_pool(
autocommit=True,
**db_conn_params
)
async_db_obj = AsyncMysqlDB(pool)
# 将连接池对象和封装的CRUD sql接口对象放到上下文变量中
db_conn_pool_var.set(pool)
media_crawler_db_var.set(async_db_obj)
async def init_db():
"""
初始化db连接池
Returns:
"""
utils.logger.info("[init_db] start init mediacrawler db connect object")
await init_mediacrawler_db()
utils.logger.info("[init_db] end init mediacrawler db connect object")
async def close():
"""
关闭连接池
Returns:
"""
utils.logger.info("[close] close mediacrawler db pool")
db_pool: aiomysql.Pool = db_conn_pool_var.get()
if db_pool is not None:
db_pool.close()
async def init_table_schema():
"""
用来初始化数据库表结构,请在第一次需要创建表结构的时候使用,多次执行该函数会将已有的表以及数据全部删除
Returns:
"""
utils.logger.info("[init_table_schema] begin init mysql table schema ...")
await init_mediacrawler_db()
async_db_obj: AsyncMysqlDB = media_crawler_db_var.get()
async with aiofiles.open("schema/tables.sql", mode="r") as f:
schema_sql = await f.read()
await async_db_obj.execute(schema_sql)
utils.logger.info("[init_table_schema] mediacrawler table schema init successful")
await close()
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(init_table_schema())
tips:
main.py
import argparse
import asyncio
import sys
import config
import db
from base.base_crawler import AbstractCrawler
from media_platform.bilibili import BilibiliCrawler
from media_platform.douyin import DouYinCrawler
from media_platform.kuaishou import KuaishouCrawler
from media_platform.weibo import WeiboCrawler
from media_platform.xhs import XiaoHongShuCrawler
class CrawlerFactory:
CRAWLERS = {
"xhs": XiaoHongShuCrawler,
"dy": DouYinCrawler,
"ks": KuaishouCrawler,
"bili": BilibiliCrawler,
"wb": WeiboCrawler
}
@staticmethod
def create_crawler(platform: str) -> AbstractCrawler:
crawler_class = CrawlerFactory.CRAWLERS.get(platform)
if not crawler_class:
raise ValueError("Invalid Media Platform Currently only supported xhs or dy or ks or bili ...")
return crawler_class()
async def main():
# define command line params ...
parser = argparse.ArgumentParser(description='Media crawler program.')
parser.add_argument('--platform', type=str, help='Media platform select (xhs | dy | ks | bili | wb)',
choices=["xhs", "dy", "ks", "bili", "wb"], default=config.PLATFORM)
parser.add_argument('--lt', type=str, help='Login type (qrcode | phone | cookie)',
choices=["qrcode", "phone", "cookie"], default=config.LOGIN_TYPE)
parser.add_argument('--type', type=str, help='crawler type (search | detail | creator)',
choices=["search", "detail", "creator"], default=config.CRAWLER_TYPE)
parser.add_argument('--start', type=int, help='crawler type (number of start page)',
default=config.START_PAGE)
parser.add_argument('--keywords', type=str, help='crawler type (please input keywords)',
default=config.KEYWORDS)
# init db
if config.SAVE_DATA_OPTION == "db":
await db.init_db()
args = parser.parse_args()
crawler = CrawlerFactory.create_crawler(platform=args.platform)
crawler.init_config(
platform=args.platform,
login_type=args.lt,
crawler_type=args.type,
start_page=args.start,
keyword=args.keywords
)
await crawler.start()
if config.SAVE_DATA_OPTION == "db":
await db.close()
if __name__ == '__main__':
try:
# asyncio.run(main())
asyncio.get_event_loop().run_until_complete(main())
except KeyboardInterrupt:
sys.exit()
base/base_crawler.py
from abc import ABC, abstractmethod
from typing import Dict, Optional
from playwright.async_api import BrowserContext, BrowserType
class AbstractCrawler(ABC):
@abstractmethod
def init_config(self, platform: str, login_type: str, crawler_type: str, start_page: int, keyword: str):
pass
@abstractmethod
async def start(self):
pass
@abstractmethod
async def search(self):
pass
@abstractmethod
async def launch_browser(self, chromium: BrowserType, playwright_proxy: Optional[Dict], user_agent: Optional[str],
headless: bool = True) -> BrowserContext:
pass
class AbstractLogin(ABC):
@abstractmethod
async def begin(self):
pass
@abstractmethod
async def login_by_qrcode(self):
pass
@abstractmethod
async def login_by_mobile(self):
pass
@abstractmethod
async def login_by_cookies(self):
pass
class AbstractStore(ABC):
@abstractmethod
async def store_content(self, content_item: Dict):
pass
@abstractmethod
async def store_comment(self, comment_item: Dict):
pass
# TODO support all platform
# only xhs is supported, so @abstractmethod is commented
# @abstractmethod
async def store_creator(self, creator: Dict):
pass
class AbstractStoreImage(ABC):
#TODO: support all platform
# only weibo is supported
# @abstractmethod
async def store_image(self, image_content_item: Dict):
pass
class AbstractApiClient(ABC):
@abstractmethod
async def request(self, method, url, **kwargs):
pass
@abstractmethod
async def update_cookies(self, browser_context: BrowserContext):
pass
@abstractmethod
async def pong(self):
pass
config/baseconfig.py
# 基础配置
PLATFORM = "xhs"
KEYWORDS = "python,golang"
LOGIN_TYPE = "qrcode" # qrcode or phone or cookie
COOKIES = ""
SORT_TYPE = "popularity_descending" # 具体值参见media_platform.xxx.field下的枚举值,展示只支持小红书
CRAWLER_TYPE = "search" # 爬取类型,search(关键词搜索) | detail(帖子详情)| creator(创作者主页数据)
# 是否开启 IP 代理
ENABLE_IP_PROXY = False
# 代理IP池数量
IP_PROXY_POOL_COUNT = 2
# 代理IP提供商名称
IP_PROXY_PROVIDER_NAME = "kuaidaili"
# 设置为True不会打开浏览器(无头浏览器)
# 设置False会打开一个浏览器
# 小红书如果一直扫码登录不通过,打开浏览器手动过一下滑动验证码
# 抖音如果一直提示失败,打开浏览器看下是否扫码登录之后出现了手机号验证,如果出现了手动过一下再试。
HEADLESS = True
# 是否保存登录状态
SAVE_LOGIN_STATE = True
# 数据保存类型选项配置,支持三种类型:csv、db、json
SAVE_DATA_OPTION = "json" # csv or db or json
# 用户浏览器缓存的浏览器文件配置
USER_DATA_DIR = "%s_user_data_dir" # %s will be replaced by platform name
# 爬取开始页数 默认从第一页开始
START_PAGE = 1
# 爬取视频/帖子的数量控制
CRAWLER_MAX_NOTES_COUNT = 20
# 并发爬虫数量控制
MAX_CONCURRENCY_NUM = 4
# 是否开启爬图片模式, 默认不开启爬图片
ENABLE_GET_IMAGES = False
# 是否开启爬评论模式, 默认不开启爬评论
ENABLE_GET_COMMENTS = False
# 是否开启爬二级评论模式, 默认不开启爬二级评论, 目前仅支持 xhs
# 老版本项目使用了 db, 则需参考 schema/tables.sql line 287 增加表字段
ENABLE_GET_SUB_COMMENTS = False
# 指定小红书需要爬虫的笔记ID列表
XHS_SPECIFIED_ID_LIST = [
"6422c2750000000027000d88",
"64ca1b73000000000b028dd2",
"630d5b85000000001203ab41",
# ........................
]
# 指定抖音需要爬取的ID列表
DY_SPECIFIED_ID_LIST = [
"7280854932641664319",
"7202432992642387233"
# ........................
]
# 指定快手平台需要爬取的ID列表
KS_SPECIFIED_ID_LIST = [
"3xf8enb8dbj6uig",
"3x6zz972bchmvqe"
]
# 指定B站平台需要爬取的视频bvid列表
BILI_SPECIFIED_ID_LIST = [
"BV1d54y1g7db",
"BV1Sz4y1U77N",
"BV14Q4y1n7jz",
# ........................
]
# 指定微博平台需要爬取的帖子列表
WEIBO_SPECIFIED_ID_LIST = [
"4982041758140155",
# ........................
]
# 指定小红书创作者ID列表
XHS_CREATOR_ID_LIST = [
"63e36c9a000000002703502b",
# ........................
]
config/db_config.py
import os
# redis config
REDIS_DB_HOST = "127.0.0.1" # your redis host
REDIS_DB_PWD = os.getenv("REDIS_DB_PWD", "123456") # your redis password
# mysql config
RELATION_DB_PWD = os.getenv("RELATION_DB_PWD", "123456") # your relation db password
RELATION_DB_URL = f"mysql://root:{RELATION_DB_PWD}@localhost:3306/media_crawler"
# sqlite3 config
# RELATION_DB_URL = f"sqlite://data/media_crawler.sqlite"