爬取历史天气网站数据
从天气网站爬取指定城市、指定时间范围内的天气数据,并将数据保存为CSV文件。具体而言,它使用了Selenium库来模拟浏览器行为,以便获取动态加载的页面内容。
主要步骤如下:
- 读取城市信息和代理IP信息,并初始化Chrome浏览器及其设置。
- 提示用户输入爬取的起始年份、结束年份、起始月份和结束月份。
- 启动Chrome浏览器调试服务,并创建Chrome驱动。
- 定义了一个名为crawl_aqi_data()的函数,用于爬取指定城市、指定年份和月份的天气数据,并将数据保存到CSV文件中。在函数内部,它首先构建了目标URL,然后打开URL,等待页面加载完成。接着,创建了保存数据的目录,并打开CSV文件进行写入。在读取数据时,它通过Selenium定位到页面上相应的元素,提取数据,并写入CSV文件。
- 循环遍历所有城市、年份和月份,调用crawl_aqi_data()函数执行爬取任务,并在每次爬取后休眠1秒。
- 最后,关闭浏览器,并输出"数据爬取完成。"
- 需要注意的是,为了避免被网站封禁IP,程序在每次爬取后都会休眠1秒,并且随机选择一个代理IP来访问网站
import os
import time
import subprocess
from selenium import webdriver
from selenium.webdriver.common.by import By
import csv
import pinyin
from fake_useragent import UserAgent
import random
# 读取城市信息
with open("city.txt", "r", encoding="utf-8") as file:
cities = [line.strip() for line in file if line.strip()]
ips =[]
with open('ip.txt', 'r') as f:
for line in f:
ip = line.strip()
ips.append(ip.strip())
# 输入爬取的起始年份、结束年份、起始月份和结束月份
start_year = int(input("请输入起始年份: "))
end_year = int(input("请输入结束年份: "))
start_month = int(input("请输入起始月份: "))
end_month = int(input("请输入结束月份: "))
# 启动Chrome浏览器调试服务
subprocess.Popen('cmd', shell=True)
subprocess.Popen('"chrome-win64\chrome.exe" --remote-debugging-port=9222', shell=True)
# 创建Chrome驱动
chrome_options = webdriver.ChromeOptions()
chrome_options.add_experimental_option("debuggerAddress", "localhost:9222")
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable‐gpu')
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.add_argument('--proxy-server=http://' + random.choice(ips))
chrome_options.add_argument(f"user-agent={UserAgent().random}")
driver = webdriver.Chrome(options=chrome_options)
def crawl_aqi_data(city, year,month):
# 构建URL
city1 = pinyin.get(city, format="strip")
url = f"https://m.tianqi.com/lishi/{city1.lower()}/{year}{month:02}.html"
# 打开URL https://m.tianqi.com/lishi/shanghai/202401.html
driver.get(url)
time.sleep(3) # 等待页面加载完成
# 创建目录
folder_path = os.getcwd()+f"\data\天气/{city}/{year}年"
if not os.path.exists(folder_path):
os.makedirs(folder_path)
# 打开CSV文件
with open(f"{folder_path}/{year}年{month:02}月.csv", "w", newline='', encoding="utf-8") as csvfile:
writer = csv.writer(csvfile)
# 写入表头
#writer.writerow(["日期", "AQI指数", "质量等级", "PM2.5", "PM10", "SO2", "CO", "NO2", "O3-8h"])
# 读取数据并保存到CSV文件
rows = driver.find_elements(By.CSS_SELECTOR, "body > div.history_weatherct1 > div.count_temp > table > tbody > tr")
for i, row in enumerate(rows):
if i == 0: # 只写入第一行数据
data = [cell.text for cell in row.find_elements(By.TAG_NAME, "td")]
if data:
print(data)
writer.writerow(["平均高温℃","平均低温℃"])
writer.writerow([data[0].split("\n")[-1].replace("℃",""),data[1].split("\n")[-1].replace("℃","")])
# 循环爬取指定城市、指定时间段的数据
for city in cities:
for year in range(start_year, end_year + 1):
for month in range(start_month, end_month + 1):
crawl_aqi_data(city, year, month)
time.sleep(1)
# 关闭浏览器
driver.quit()
print("数据爬取完成。")
封禁ip
温度可视化
import os
import pandas as pd
import matplotlib.pyplot as plt
# 读取CSV文件并加载数据
def load_data(file_path):
if os.path.exists(file_path):
if os.stat(file_path).st_size == 0:
print(f"File '{file_path}' is empty.")
return None
df = pd.read_csv(file_path)
return df
else:
print(f"File '{file_path}' does not exist.")
return None
# 可视化数据
def visualize_temperatures(df):
# 可视化显示
plt.figure(figsize=(10, 6))
plt.plot(df['城市'], df['平均高温℃'], marker='o', color='red', linestyle='-', label='平均高温')
plt.plot(df['城市'], df['平均低温℃'], marker='o', color='blue', linestyle='-')
plt.xlabel('城市')
plt.ylabel('温度')
plt.title('各城市温度对比')
plt.xticks(rotation=90) # 旋转x轴标签,以便更好地显示城市名
plt.grid(True) # 显示网格线
plt.tight_layout() # 调整布局,防止标签重叠
plt.show()
# 主函数
def main():
# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
# 指定天气目录路径
weather_dir = os.path.join(os.getcwd(), 'data', '天气')
all_df = []
# 遍历天气目录下的城市数据
for city_name in os.listdir(weather_dir):
city_dir = os.path.join(weather_dir, city_name)
if os.path.isdir(city_dir):
# 遍历城市文件夹下的所有 CSV 文件
for year in os.listdir(city_dir):
year_dir = os.path.join(city_dir, year)
for csv_file in os.listdir(year_dir):
if csv_file.endswith(".csv"):
# 构建文件路径
file_path = os.path.join(year_dir, csv_file)
# 加载数据
df = load_data(file_path)
# 添加城市列
df['城市'] = city_name
all_df.append(df)
rged_df = pd.concat(all_df, ignore_index=True)
print(rged_df)
# 可视化数据
visualize_temperatures(rged_df)
if __name__ == "__main__":
main()