2024-简单点-MediaCrawler解构

MediaCrawler

  • var.py
  • recv_sms.py
  • async_db.py
  • db.py
  • main.py
  • base/base_crawler.py
  • config/baseconfig.py
  • config/db_config.py
  • 有待更新

在这里插入图片描述

var.py

from asyncio.tasks import Task
from contextvars import ContextVar
from typing import List

import aiomysql

from async_db import AsyncMysqlDB

request_keyword_var: ContextVar[str] = ContextVar("request_keyword", default="")
crawler_type_var: ContextVar[str] = ContextVar("crawler_type", default="")
comment_tasks_var: ContextVar[List[Task]] = ContextVar("comment_tasks", default=[])
media_crawler_db_var: ContextVar[AsyncMysqlDB] = ContextVar("media_crawler_db_var")
db_conn_pool_var: ContextVar[aiomysql.Pool] = ContextVar("db_conn_pool_var")

在这里插入图片描述

recv_sms.py

import re
from typing import List

import redis
import uvicorn
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel

import config
from tools import utils

app = FastAPI()

redis_client = redis.Redis(host=config.REDIS_DB_HOST, password=config.REDIS_DB_PWD)


class SmsNotification(BaseModel):
    platform: str
    current_number: str
    from_number: str
    sms_content: str
    timestamp: str


def extract_verification_code(message: str) -> str:
    """
    Extract verification code of 6 digits from the SMS.
    """
    pattern = re.compile(r'\b[0-9]{6}\b')
    codes: List[str] = pattern.findall(message)
    return codes[0] if codes else ""


@app.post("/")
def receive_sms_notification(sms: SmsNotification):
    """
    Receive SMS notification and send it to Redis.
    Args:
        sms:
            {
                "platform": "xhs",
                "from_number": "1069421xxx134",
                "sms_content": "【小红书】您的验证码是: 171959, 3分钟内有效。请勿向他人泄漏。如非本人操作,可忽略本消息。",
                "timestamp": "1686720601614",
                "current_number": "13152442222"
            }

    Returns:

    """
    utils.logger.info(f"Received SMS notification: {sms.platform}, {sms.current_number}")
    sms_code = extract_verification_code(sms.sms_content)
    if sms_code:
        # Save the verification code in Redis and set the expiration time to 3 minutes.
        key = f"{sms.platform}_{sms.current_number}"
        redis_client.set(key, sms_code, ex=60 * 3)

    return {"status": "ok"}


@app.get("/", status_code=status.HTTP_404_NOT_FOUND)
async def not_found():
    raise HTTPException(status_code=404, detail="Not Found")


if __name__ == '__main__':
    uvicorn.run(app, port=8000, host='0.0.0.0')

在这里插入图片描述

async_db.py

from typing import Any, Dict, List, Union

import aiomysql


class AsyncMysqlDB:
    def __init__(self, pool: aiomysql.Pool) -> None:
        self.__pool = pool

    async def query(self, sql: str, *args: Union[str, int]) -> List[Dict[str, Any]]:
        """
        从给定的 SQL 中查询记录,返回的是一个列表
        :param sql: 查询的sql
        :param args: sql中传递动态参数列表
        :return:
        """
        async with self.__pool.acquire() as conn:
            async with conn.cursor(aiomysql.DictCursor) as cur:
                await cur.execute(sql, args)
                data = await cur.fetchall()
                return data or []

    async def get_first(self, sql: str, *args: Union[str, int]) -> Union[Dict[str, Any], None]:
        """
        从给定的 SQL 中查询记录,返回的是符合条件的第一个结果
        :param sql: 查询的sql
        :param args:sql中传递动态参数列表
        :return:
        """
        async with self.__pool.acquire() as conn:
            async with conn.cursor(aiomysql.DictCursor) as cur:
                await cur.execute(sql, args)
                data = await cur.fetchone()
                return data

    async def item_to_table(self, table_name: str, item: Dict[str, Any]) -> int:
        """
        表中插入数据
        :param table_name: 表名
        :param item: 一条记录的字典信息
        :return:
        """
        fields = list(item.keys())
        values = list(item.values())
        fields = [f'`{field}`' for field in fields]
        fieldstr = ','.join(fields)
        valstr = ','.join(['%s'] * len(item))
        sql = "INSERT INTO %s (%s) VALUES(%s)" % (table_name, fieldstr, valstr)
        async with self.__pool.acquire() as conn:
            async with conn.cursor(aiomysql.DictCursor) as cur:
                await cur.execute(sql, values)
                lastrowid = cur.lastrowid
                return lastrowid

    async def update_table(self, table_name: str, updates: Dict[str, Any], field_where: str,
                           value_where: Union[str, int, float]) -> int:
        """
        更新指定表的记录
        :param table_name: 表名
        :param updates: 需要更新的字段和值的 key - value 映射
        :param field_where: update 语句 where 条件中的字段名
        :param value_where: update 语句 where 条件中的字段值
        :return:
        """
        upsets = []
        values = []
        for k, v in updates.items():
            s = '`%s`=%%s' % k
            upsets.append(s)
            values.append(v)
        upsets = ','.join(upsets)
        sql = 'UPDATE %s SET %s WHERE %s="%s"' % (
            table_name,
            upsets,
            field_where, value_where,
        )
        async with self.__pool.acquire() as conn:
            async with conn.cursor() as cur:
                rows = await cur.execute(sql, values)
                return rows

    async def execute(self, sql: str, *args: Union[str, int]) -> int:
        """
        需要更新、写入等操作的 excute 执行语句
        :param sql:
        :param args:
        :return:
        """
        async with self.__pool.acquire() as conn:
            async with conn.cursor() as cur:
                rows = await cur.execute(sql, args)
                return rows

在这里插入图片描述

db.py

import asyncio
from typing import Dict
from urllib.parse import urlparse

import aiofiles
import aiomysql

import config
from async_db import AsyncMysqlDB
from tools import utils
from var import db_conn_pool_var, media_crawler_db_var


def parse_mysql_url(mysql_url) -> Dict:
    """
    从配置文件中解析db链接url,给到aiomysql用,因为aiomysql不支持直接以URL的方式传递链接信息。
    Args:
        mysql_url: mysql://root:{RELATION_DB_PWD}@localhost:3306/media_crawler

    Returns:

    """
    parsed_url = urlparse(mysql_url)
    db_params = {
        'host': parsed_url.hostname,
        'port': parsed_url.port or 3306,
        'user': parsed_url.username,
        'password': parsed_url.password,
        'db': parsed_url.path.lstrip('/')
    }
    return db_params


async def init_mediacrawler_db():
    """
    初始化数据库链接池对象,并将该对象塞给media_crawler_db_var上下文变量
    Returns:

    """
    db_conn_params = parse_mysql_url(config.RELATION_DB_URL)
    pool = await aiomysql.create_pool(
        autocommit=True,
        **db_conn_params
    )
    async_db_obj = AsyncMysqlDB(pool)

    # 将连接池对象和封装的CRUD sql接口对象放到上下文变量中
    db_conn_pool_var.set(pool)
    media_crawler_db_var.set(async_db_obj)


async def init_db():
    """
    初始化db连接池
    Returns:

    """
    utils.logger.info("[init_db] start init mediacrawler db connect object")
    await init_mediacrawler_db()
    utils.logger.info("[init_db] end init mediacrawler db connect object")


async def close():
    """
    关闭连接池
    Returns:

    """
    utils.logger.info("[close] close mediacrawler db pool")
    db_pool: aiomysql.Pool = db_conn_pool_var.get()
    if db_pool is not None:
        db_pool.close()


async def init_table_schema():
    """
    用来初始化数据库表结构,请在第一次需要创建表结构的时候使用,多次执行该函数会将已有的表以及数据全部删除
    Returns:

    """
    utils.logger.info("[init_table_schema] begin init mysql table schema ...")
    await init_mediacrawler_db()
    async_db_obj: AsyncMysqlDB = media_crawler_db_var.get()
    async with aiofiles.open("schema/tables.sql", mode="r") as f:
        schema_sql = await f.read()
        await async_db_obj.execute(schema_sql)
        utils.logger.info("[init_table_schema] mediacrawler table schema init successful")
        await close()


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(init_table_schema())

在这里插入图片描述
tips:
在这里插入图片描述

main.py

import argparse
import asyncio
import sys

import config
import db
from base.base_crawler import AbstractCrawler
from media_platform.bilibili import BilibiliCrawler
from media_platform.douyin import DouYinCrawler
from media_platform.kuaishou import KuaishouCrawler
from media_platform.weibo import WeiboCrawler
from media_platform.xhs import XiaoHongShuCrawler


class CrawlerFactory:
    CRAWLERS = {
        "xhs": XiaoHongShuCrawler,
        "dy": DouYinCrawler,
        "ks": KuaishouCrawler,
        "bili": BilibiliCrawler,
        "wb": WeiboCrawler
    }

    @staticmethod
    def create_crawler(platform: str) -> AbstractCrawler:
        crawler_class = CrawlerFactory.CRAWLERS.get(platform)
        if not crawler_class:
            raise ValueError("Invalid Media Platform Currently only supported xhs or dy or ks or bili ...")
        return crawler_class()


async def main():
    # define command line params ...
    parser = argparse.ArgumentParser(description='Media crawler program.')
    parser.add_argument('--platform', type=str, help='Media platform select (xhs | dy | ks | bili | wb)',
                        choices=["xhs", "dy", "ks", "bili", "wb"], default=config.PLATFORM)
    parser.add_argument('--lt', type=str, help='Login type (qrcode | phone | cookie)',
                        choices=["qrcode", "phone", "cookie"], default=config.LOGIN_TYPE)
    parser.add_argument('--type', type=str, help='crawler type (search | detail | creator)',
                        choices=["search", "detail", "creator"], default=config.CRAWLER_TYPE)
    parser.add_argument('--start', type=int, help='crawler type (number of start page)',
                         default=config.START_PAGE)
    parser.add_argument('--keywords', type=str, help='crawler type (please input keywords)',
                         default=config.KEYWORDS)
    
    # init db
    if config.SAVE_DATA_OPTION == "db":
        await db.init_db()

    args = parser.parse_args()
    crawler = CrawlerFactory.create_crawler(platform=args.platform)
    crawler.init_config(
        platform=args.platform,
        login_type=args.lt,
        crawler_type=args.type,
        start_page=args.start,
        keyword=args.keywords
    )
    await crawler.start()
    
    if config.SAVE_DATA_OPTION == "db":
        await db.close()


if __name__ == '__main__':
    try:
        # asyncio.run(main())
        asyncio.get_event_loop().run_until_complete(main())
    except KeyboardInterrupt:
        sys.exit()

在这里插入图片描述

base/base_crawler.py

from abc import ABC, abstractmethod
from typing import Dict, Optional

from playwright.async_api import BrowserContext, BrowserType


class AbstractCrawler(ABC):
    @abstractmethod
    def init_config(self, platform: str, login_type: str, crawler_type: str, start_page: int, keyword: str):
        pass

    @abstractmethod
    async def start(self):
        pass

    @abstractmethod
    async def search(self):
        pass

    @abstractmethod
    async def launch_browser(self, chromium: BrowserType, playwright_proxy: Optional[Dict], user_agent: Optional[str],
                             headless: bool = True) -> BrowserContext:
        pass


class AbstractLogin(ABC):
    @abstractmethod
    async def begin(self):
        pass

    @abstractmethod
    async def login_by_qrcode(self):
        pass

    @abstractmethod
    async def login_by_mobile(self):
        pass

    @abstractmethod
    async def login_by_cookies(self):
        pass


class AbstractStore(ABC):
    @abstractmethod
    async def store_content(self, content_item: Dict):
        pass

    @abstractmethod
    async def store_comment(self, comment_item: Dict):
        pass

    # TODO support all platform
    # only xhs is supported, so @abstractmethod is commented
    # @abstractmethod
    async def store_creator(self, creator: Dict):
        pass

class AbstractStoreImage(ABC):
    #TODO: support all platform
    # only weibo is supported
    # @abstractmethod
    async def store_image(self, image_content_item: Dict):
        pass

class AbstractApiClient(ABC):
    @abstractmethod
    async def request(self, method, url, **kwargs):
        pass

    @abstractmethod
    async def update_cookies(self, browser_context: BrowserContext):
        pass

    @abstractmethod
    async def pong(self):
        pass

在这里插入图片描述

config/baseconfig.py

# 基础配置
PLATFORM = "xhs"
KEYWORDS = "python,golang"
LOGIN_TYPE = "qrcode"  # qrcode or phone or cookie
COOKIES = ""
SORT_TYPE = "popularity_descending"  # 具体值参见media_platform.xxx.field下的枚举值,展示只支持小红书
CRAWLER_TYPE = "search"  # 爬取类型,search(关键词搜索) | detail(帖子详情)| creator(创作者主页数据)

# 是否开启 IP 代理
ENABLE_IP_PROXY = False

# 代理IP池数量
IP_PROXY_POOL_COUNT = 2

# 代理IP提供商名称
IP_PROXY_PROVIDER_NAME = "kuaidaili"

# 设置为True不会打开浏览器(无头浏览器)
# 设置False会打开一个浏览器
# 小红书如果一直扫码登录不通过,打开浏览器手动过一下滑动验证码
# 抖音如果一直提示失败,打开浏览器看下是否扫码登录之后出现了手机号验证,如果出现了手动过一下再试。
HEADLESS = True

# 是否保存登录状态
SAVE_LOGIN_STATE = True

# 数据保存类型选项配置,支持三种类型:csv、db、json
SAVE_DATA_OPTION = "json"  # csv or db or json

# 用户浏览器缓存的浏览器文件配置
USER_DATA_DIR = "%s_user_data_dir"  # %s will be replaced by platform name

# 爬取开始页数 默认从第一页开始
START_PAGE = 1

# 爬取视频/帖子的数量控制
CRAWLER_MAX_NOTES_COUNT = 20

# 并发爬虫数量控制
MAX_CONCURRENCY_NUM = 4

# 是否开启爬图片模式, 默认不开启爬图片
ENABLE_GET_IMAGES = False

# 是否开启爬评论模式, 默认不开启爬评论
ENABLE_GET_COMMENTS = False

# 是否开启爬二级评论模式, 默认不开启爬二级评论, 目前仅支持 xhs
# 老版本项目使用了 db, 则需参考 schema/tables.sql line 287 增加表字段
ENABLE_GET_SUB_COMMENTS = False

# 指定小红书需要爬虫的笔记ID列表
XHS_SPECIFIED_ID_LIST = [
    "6422c2750000000027000d88",
    "64ca1b73000000000b028dd2",
    "630d5b85000000001203ab41",
    # ........................
]

# 指定抖音需要爬取的ID列表
DY_SPECIFIED_ID_LIST = [
    "7280854932641664319",
    "7202432992642387233"
    # ........................
]

# 指定快手平台需要爬取的ID列表
KS_SPECIFIED_ID_LIST = [
    "3xf8enb8dbj6uig",
    "3x6zz972bchmvqe"
]

# 指定B站平台需要爬取的视频bvid列表
BILI_SPECIFIED_ID_LIST = [
    "BV1d54y1g7db",
    "BV1Sz4y1U77N",
    "BV14Q4y1n7jz",
    # ........................
]

# 指定微博平台需要爬取的帖子列表
WEIBO_SPECIFIED_ID_LIST = [
    "4982041758140155",
    # ........................
]

# 指定小红书创作者ID列表
XHS_CREATOR_ID_LIST = [
    "63e36c9a000000002703502b",
    # ........................
]

config/db_config.py

import os

# redis config
REDIS_DB_HOST = "127.0.0.1"  # your redis host
REDIS_DB_PWD = os.getenv("REDIS_DB_PWD", "123456")  # your redis password

# mysql config
RELATION_DB_PWD = os.getenv("RELATION_DB_PWD", "123456")  # your relation db password
RELATION_DB_URL = f"mysql://root:{RELATION_DB_PWD}@localhost:3306/media_crawler"

# sqlite3 config
# RELATION_DB_URL = f"sqlite://data/media_crawler.sqlite"

有待更新

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/628209.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【脚本】使用脚本备份docker中部署的mysql数据库

v1版本明文密码方式: #!/bin/bash# 定义 MySQL 容器名称和数据库信息 container_name"mysql_container" db_user"root" db_password"your_password"# 定义要备份的数据库列表 databases("database1" "database2"…

交换机组网最常见的8大故障及解决方式

有朋友多次提到网络故障,其中在交换机组网时常见的故障比较多,为了便于大家排除这些故障,在此介绍一些常见的典型故障案例及处理思路。 故障1:交换机刚加电时网络无法通信 【故障现象】 交换机刚刚开启的时候无法连接至其他网络…

摸鱼大数据——Linux搭建大数据环境(安装zooKeeper和zookeeper shell命令)五

安装zookeeper软件 1.上传软件 使用CRT等客户端远程上传 zookeeper-3.4.6.tar.gz 文件到/export/software目录下 2.解压软件 [rootnode1 ~]# cd /export/software/ [rootnode1 software]# tar -xzvf zookeeper-3.4.6.tar.gz -C /export/server/ [rootnode1 software]# cd /ex…

用友GRP-U8 userInfoWeb SQL注入致RCE漏洞复现 (XVE-2024-10539)

0x01 产品简介 用友GRP-U8R10行政事业内控管理软件是用友公司专注于国家电子政务事业,基于云计算技术所推出的新一代产品,是我国行政事业财务领域最专业的政府财务管理软件。 0x02 漏洞概述 用友GRP-U8R10行政事业内控管理软件 userInfoWeb接口处存在SQL注入漏洞,未授权的…

自由职业是种怎样的体验?普通人如何成为一名自由职业者?

自由职业在哪都能办公自由职业在哪都要办公。 放弃幻想,没有不辛苦的工作,5年经验后端开发程序员,已经从事自由职业1年半,今天就来客观分享一下自由职业的利与弊。 时间自由,减少中间商赚差价 自由职业最让人羡慕的就…

电机完美控制的感觉如何【应用案例】

当电机控制技术成为人体的一部分时,对控制系统的组件尺寸和可靠性要求将极大提高。得益于集成式FOC控制系统组件,第一款具有两个活动关节的义肢得以在短时间内完成—— 赶上在苏黎世举办的全球半机械人奥运会(Cybathlon)。 失去肢体显然会对一个人的生活…

GPT4o速测:约0.5秒延迟的多模态能力

文章目录 1. 测评2. IntroReference 没有剪辑,约0.5秒延迟的多模态能力。 1. 测评 推理速度异常快,比之前快了大概两三倍,对产品端来说是个很好的事情,想用gpt4级别性能终于可以少讨论几句时延影响用户体验了模型指令遵从能力变强…

GPT3.5、GPT4、GPT4o的性能对比

理论总结 随着版本的升级,模型在参数数量、语言理解能力、生成文本质量、多模态能力、推理能力等方面均有显著提升。GPT-4.0作为最新改进版,提供了最先进的功能和性能。 实际对比 1.1.GPT3.5 1.2.GPT4 1.3.GPT4o 在语义理解上,无差别。 下面测试下代码能力。 测试问题 我…

电脑的录屏功能在哪?一篇文章教你快速定位

“电脑录屏功能在哪里呀?因为需要录制一些教学视频,急需用到电脑的录屏功能。我已经在电脑上翻箱倒柜地找了好几个小时,可还是没有找到。时间紧迫,这个任务对我来说非常重要,我现在感到非常焦虑,希望大家帮…

YOLOv8独家改进:逐元素乘法(star operation)二次创新 | 微软新作StarNet:超强轻量级Backbone CVPR 2024

💡💡💡创新点:star operation(元素乘法)在无需加宽网络下,将输入映射到高维非线性特征空间的能力,逐元素乘法(star operation)在性能上始终优于求和,基于star operation块做二次创新 💡💡💡如何跟YOLOv8结合:替代YOLOv8的C2f,结构图如下 收录 YOLOv8…

压力给到 Google,OpenAI 发布 GPT-4o 来了

北京时间5月14日凌晨1点,OpenAI 开启了今年的第一次直播,根据官方消息,这次旨在演示 ChatGPT 和 GPT-4 的升级内容。在早些时候 Sam Altman 在 X 上已经明确,「我们一直在努力开发一些我们认为人们会喜欢的新东西,对我…

炫富神器,简单无脑粘贴复制,闷声发财,当天见收益,无上限封顶

项目主打简单、暴力、易操作、可复制,单人可做、不靠关系走门路、不重投资、可复制放大! 今天给大家带来的这个项目,有点暴力,请先做好心理准备!谨慎观看!! 这个项目原理是利用软件生成炫富视频…

数据结构——队列(链表实现)

一、队列的特点 先进先出 二、队列的代码 typedef int QDataType;// 链式结构:表示队列 typedef struct QListNode {struct QListNode* next;QDataType data; }QNode;// 队列的结构 typedef struct Queue {QNode* front; //指向队列的第一个结点QNode* rear;//指…

基于uniapp+vue3+ts开发微信小程序项目实战

🚀 作者 :“二当家-小D” 🚀 博主简介:⭐前荔枝FM架构师、阿里资深工程师||曾任职于阿里巴巴担任多个项目负责人,8年开发架构经验,精通java,擅长分布式高并发架构,自动化压力测试,微服务容器化k…

香港电讯高效网络,助力新消费品牌抓住拓展香港市场新风口

自今年初香港与内地全面恢复通关,两地同胞跨境消费热潮持续升温。港人“北上”消费掀起风潮的同时,香港市场也成为内地新消费品牌拓展的热门目标。从糕点、茶饮、连锁餐饮到服饰,越来越多内地品牌进驻香港。新消费品牌要想在香港开设门店&…

气膜建筑会漏气吗—轻空间

气膜建筑作为一种创新的建筑形式,其主要结构依靠充气系统提供源源不断的风力,以维持内部气压,从而支撑起整个膜体,抵御外部的风雪荷载。然而,气膜建筑能否保持完全的密封性,是否会漏气,是许多用…

python批量生成验证码,python生成验证码

欢迎关注我👆,收藏下次不迷路┗|`O′|┛ 嗷~~ 目录 一.前言 二.代码 三.使用 四.总结 一.前言 验证码(CAPTCHA)是“Completely Automated Public Turing test to tell Computers and Human

国标GB28181协议EasyCVR视频汇聚平台获取设备录像仅展示部分片段的原因排查

国标GB28181协议EasyCVR安防平台可以提供实时远程视频监控、视频录像、录像回放与存储、告警、语音对讲、云台控制、平台级联、磁盘阵列存储、视频集中存储、云存储等丰富的视频能力,平台支持7*24小时实时高清视频监控,能同时播放多路监控视频流&#xf…

抖店曝光率高,转化低,不知道怎么提升转化率?试试这四个方法

大家好,我是醒醒团队电商花花。 我们现在做抖音小店的商家或多或少都会遇到不出单,转化低的各种问题。 明明店铺的曝光不低,访客也不少,就是没转化。 下面我根据我们做店的经验,给大家分享一些问题所在,…

从零开始成为网络安全工程师:提高竞争力的秘诀

在当今数字化和互联网化的时代,网络安全工程师的职责越来越重要。然而,网络安全行业发展迅速,竞争也越来越激烈。要成为一名有竞争力的网络安全工程师,需要有一定的技能和经验,同时要不断提升自己的能力。下面是坤哥结…