基于Spark的气象数据处理与分析

文章目录

  • 一、实验环境
  • 二、实验数据介绍
  • 三、数据获取
    • 1.观察数据获取方式
    • 2.数据爬取
    • 3.数据存储
    • 4.数据读取
    • 5.数据结构
    • 6.爬虫过程截图
  • 四、数据分析
    • 1.计算各个城市过去24小时累积雨量
    • 2.计算各个城市当日平均气温
    • 3.计算各个城市当日平均湿度
    • 4.计算各个城市当日平均风速
  • 五、数据可视化
  • 六、数据以及源代码
    • 1.爬取的数据截图:
    • 2.爬虫代码
    • 3.Spark分析代码:

本实验采用Python语言,从网页爬取气象数据,并使用大数据处理框架Spark对气象数据进行处理分析,并对分析结果进行可视化。

一、实验环境

(1)Linux: Ubuntu 20.04
(2)Python: 3.6
(3)Spark: 3.2.0
(4)pycharm
安装完上述环境以后,为了支持Python可视化分析,还需要执行如下命令安装新的组件:

二、实验数据介绍

本次实验所采用的数据,从中央气象台官方网站(网址:http://www.nmc.cn/)爬取,主要是最近24小时各个城市的天气数据,包括时间点(整点)、整点气温、整点降水量、风力、整点气压、相对湿度等。正常情况下,每个城市会对应24条数据(每个整点一条)。数据规模达到2429个城市,58297条数据,有部分城市部分时间点数据存在缺失或异常。限于本次大作业时间有限,没有办法全面分析这些数据,大作业中主要计算分析了各个城市过去24小时的平均气温和降水量情况。
在这里插入图片描述
图 1 中央气象台官网
在这里插入图片描述
图 2 爬取的表格passed_weather_ALL.csv信息

三、数据获取

1.观察数据获取方式

打开中央气象台官方网站(网址:http://www.nmc.cn/),任意点击左侧栏“热点城市”中的一个城市。打开火狐(Firefox)浏览器或者谷歌(chrome)浏览器的Web控制台。通过切换“省份”和“城市”,我们可以发现,网页中的数据是以json字符串格式异步地从服务器传送。可以发现以下数据和请求URL的关系。
在这里插入图片描述

请求URL 传回数据
http://www.nmc.cn/f/rest/province 省份数据
在这里插入图片描述

http://www.nmc.cn/f/rest/province/+省份三位编码 某个省份的城市数据
在这里插入图片描述

http://www.nmc.cn/f/rest/passed/+城市编号 某个城市最近24小时整点天气数据
在这里插入图片描述

由于省份三位编码(如福建省编码为“ABJ”)需要从省份数据获得中获得,城市编号需要从城市数据获得(如福州市编号为“58847”),所以为了获得各个城市最近24小时整点天气数据,依次爬取省份数据、城市数据、最近24小时整点数据。

2.数据爬取

由于可以直接通过访问请求URL,传回的响应的数据部分即是json格式的数据,所以只需要调用python的urllib.request, urllib.error, urllib.parse库中相关函数,对上述URL进行请求即可。不需要像平常爬取HTML网页时还需要对网页源码进行解析,查找相关数据。唯一需要注意的是,有些城市可能不存在或者全部缺失最近24小时整点数据,需要进行过滤,以免出错。

3.数据存储

虽然上一步获取的json数据可以直接存储并可使用SparkSession直接读取,但是为了方便观察数据结构、辨识异常数据、对数据增加部分提示信息,爬取后的数据进行了一些处理之后,保存成了csv格式,包括省份数据(province.csv)、城市数据(city.csv)、各个城市最近24小时整点天气数据(passed_weather_ALL.csv)。由于所有城市过去24小时整点天气数据数量太多,为了避免内存不足,每爬取50个城市的数据后,就会进行一次保存。

4.数据读取

因为各个城市最近24小时整点天气数据体量较大,每次爬取需要半小时以上,为了提高实验效率,只会进行一次数据爬取。此后会直接读取第一次实验数据。如果需要重新爬取数据,需要手动删除已有数据,即删除input文件夹下province.csv、city.csv、passed_weather_ALL.csv。

5.数据结构

最后保存的各个城市最近24小时整点天气数据(passed_weather_ALL.csv)每条数据各字段含义如下所示,这里仅列出实验中使用部分。
字段 含义
province 城市所在省份(中文)
city_index 城市序号(计数)
city_name 城市名称(中文)
city_code 城市编号
time 时间点(整点)
temperature 气温
rain1h 过去1小时降雨量

6.爬虫过程截图

开始爬虫
在这里插入图片描述
运行中的截图:

在这里插入图片描述

爬取完毕:
在这里插入图片描述

四、数据分析

数据分析主要使用Spark SQL相关知识与技术,对各个城市过去24小时累积降雨量和当日平均气温进行了计算和排序。

1.计算各个城市过去24小时累积雨量

思路:按照城市对数据进行分组,对每个城市的rain1h字段进行分组求和。
特别说明:由于获取数据所需时间较长,天气数据的时间跨度可能略有不一致,这里为了简化操作没有筛选出具有相同时间跨度的数据再进行计算。
相关步骤如下:
(1)创建SparkSession对象spark;
(2)使用spark.read.csv(filename)读取passed_weather_ALL.csv数据生成Dateframe df;
(3)对df进行操作:使用Dateframe的select方法选择province,city_name,city_code,rain1h字段,并使用Column对象的cast(dateType)方法将rain1h转成数值型,再使用Dateframe的filter方法筛选出rain1h小于1000的记录(大于1000是异常数据),得到新的Dateframe df_rain;
(4)对df_rain进行操作:使用Dateframe的groupBy操作按照province,city_name,city_code的字段分组,使用agg方法对rain1h字段进行分组求和得到新的字段rain24h(过去24小时累积雨量),使用sort方法按照rain24h降序排列,经过上述操作得到新的Dateframe df_rain_sum
(5)对df_rain_sum调用cache()方法将此前的转换关系进行缓存,提高性能
(6)对df_rain_sum调用coalesce()将数据分区数目减为1,并使用write.csv(filename)方法将得到的数据持久化到本地文件。
(7)对df_rain_sum调用head()方法取前若干条数据(即24小时累积降水量Top-N的列表)供数据可视化使用。
本部分分析对应的具体代码如下:

def passed_rain_analyse(filename):  # 计算各个城市过去24小时累积雨量
    print("begin to analyse passed rain")
    spark = SparkSession.builder.master("local").appName("passed_rain_analyse").getOrCreate()
    df = spark.read.csv(filename, header=True)
    df_rain = df.select(df['province'], df['city_name'], df['city_code'], df['rain1h'].cast(DecimalType(scale=1))) \
        .filter(df['rain1h'] < 1000)  # 筛选数据,去除无效数据
    df_rain_sum = df_rain.groupBy("province", "city_name", "city_code") \
        .agg(F.sum("rain1h").alias("rain24h")) \
        .sort(F.desc("rain24h"))  # 分组、求和、排序
    df_rain_sum.cache()
    df_rain_sum.coalesce(1).write.csv("file:///home/hadoop/PythonCode/SparkAnalysis/passed_rain_analyse.csv")
    print("end analysing passed rain")
    return df_rain_sum.head(20)

2.计算各个城市当日平均气温

思路:根据国家标准(《地面气象服务观测规范》),日平均气温取四时次数据的平均值,四时次数据为:02时、08时、14时、20时。据此,应该先筛选出各个时次的气温数据,再按照城市对数据进行分组,对每个城市的tempeature字段进行分组求平均。
特别说明:为了能获取到上述一天的四个时次的天气数据,建议在当天的20时30分后再爬取数据。
相关步骤如下:
(1)创建SparkSession对象spark;
(2)使用spark.read.csv(filename)读取passed_weather_ALL.csv数据生成Dateframe df;
(3)对df进行操作:使用Dateframe的select方法选择province,city_name,city_code,temperature字段,并使用库pyspark.sql.functions中的date_format(col,pattern)方法和hour(col)将time字段转换成date(日期)字段和hour(小时)字段,(time字段的分秒信息无用),,得到新的Dateframe df_temperature;
(4)对df_temperature进行操作:使用Dateframe的filter操作过滤出hour字段在[2,8,14,20]中的记录,经过上述操作得到新的Dateframe df_4point_temperature
(5)对df_4point_temperature进行操作:使用Dateframe的groupBy操作按照province,city_name,city_code,date字段分组,使用agg方法对temperature字段进行分组计数和求和(求和字段命名为avg_temperature),使用filter方法过滤出分组计数为4的记录(确保有4个时次才能计算日平均温),使用sort方法按照avg_temperature降序排列,再筛选出需要保存的字段province,city_name,city_code,date,avg_temperature(顺便使用库pyspark.sql.functions中的format_number(col,precision)方法保留一位小数),经过上述操作得到新的Dateframe df_avg_temperature
(6)对df_avg_temperature调用cache()方法将此前的转换关系进行缓存,提高性能
(7)对df_avg_temperature调用coalesce()将数据分区数目减为1,并使用write.json(filename)方法将得到的数据持久化到本地文件。
(8)对df_rain_sum调用collect()方法取将Dateframe转换成list,方便后续进行数据可视化。
本部分分析对应的具体代码如下:

def passed_temperature_analyse(filename):
    print("begin to analyse passed temperature")
    spark = SparkSession.builder.master("local").appName("passed_temperature_analyse").getOrCreate()
    df = spark.read.csv(filename, header=True)
    df_temperature = df.select(  # 选择需要的列
        df['province'],
        df['city_name'],
        df['city_code'],
        df['temperature'].cast(DecimalType(scale=1)),
        F.date_format(df['time'], "yyyy-MM-dd").alias("date"),  # 得到日期数据
        F.hour(df['time']).alias("hour")  # 得到小时数据
    )
    # 筛选四点时次
    df_4point_temperature = df_temperature.filter(df_temperature['hour'].isin([2, 8, 12, 20]))
    # df_4point_temperature.printSchema()
    df_avg_temperature = df_4point_temperature.groupBy("province", "city_name", "city_code", "date") \
        .agg(F.count("temperature"), F.avg("temperature").alias("avg_temperature")) \
        .filter("count(temperature) = 4") \
        .sort(F.asc("avg_temperature")) \
        .select("province", "city_name", "city_code", "date",
                F.format_number('avg_temperature', 1).alias("avg_temperature"))
    df_avg_temperature.cache()
    avg_temperature_list = df_avg_temperature.collect()
    df_avg_temperature.coalesce(1).write.json("file:///home/hadoop/PythonCode/SparkAnalysis/passed_temperature_analyse.json")
    print("end analysing passed temperature")
    return avg_temperature_list[0:10]

3.计算各个城市当日平均湿度

具体步骤与计算计算各个城市当日平均气温类似;
代码:

def passed_humidity_analyse(filename):
    print("begin to analyse passed humidity")
    spark = SparkSession.builder.master("local").appName("passed_humidity_analyse").getOrCreate()
    df = spark.read.csv(filename, header=True)
    df_humidity = df.select(  # 选择需要的列
        df['province'],
        df['city_name'],
        df['city_code'],
        df['humidity'].cast(DecimalType(scale=1)),
        F.date_format(df['time'], "yyyy-MM-dd").alias("date"),  # 得到日期数据
        F.hour(df['time']).alias("hour")  # 得到小时数据
    )
    # 筛选四点时次
    df_4point_humidity = df_humidity.filter(df_humidity['hour'].isin([2, 8, 12, 20]))
    # df_4point_humidity.printSchema()
    df_avg_humidity = df_4point_humidity.groupBy("province", "city_name", "city_code", "date") \
        .agg(F.count("humidity"), F.avg("humidity").alias("avg_humidity")) \
        .filter("count(humidity) = 4") \
        .sort(F.asc("avg_humidity")) \
        .select("province", "city_name", "city_code", "date",
                F.format_number('avg_humidity', 1).alias("avg_humidity"))
    df_avg_humidity.cache()
    avg_humidity_list = df_avg_humidity.collect()
    df_avg_humidity.coalesce(1).write.json("file:///home/hadoop/PythonCode/SparkAnalysis/passed_humidity_analyse.json")
    print("end analysing passed analyse")
    return avg_humidity_list[0:10]

4.计算各个城市当日平均风速

具体步骤与计算计算各个城市当日平均气温类似;
代码:

def passed_windSpeed_analyse(filename):
    print("begin to analyse passed windSpeed")
    spark = SparkSession.builder.master("local").appName("passed_windSpeed_analyse").getOrCreate()
    df = spark.read.csv(filename, header=True)
    df_windSpeed = df.select(  # 选择需要的列
        df['province'],
        df['city_name'],
        df['city_code'],
        df['windSpeed'].cast(DecimalType(scale=1)),
        F.date_format(df['time'], "yyyy-MM-dd").alias("date"),  # 得到日期数据
        F.hour(df['time']).alias("hour")  # 得到小时数据
    )
    # 筛选四点时次
    df_4point_windSpeed = df_windSpeed.filter(df_windSpeed['hour'].isin([2, 8, 12, 20]))
    # df_4point_windSpeed.printSchema()
    df_avg_windSpeed = df_4point_windSpeed.groupBy("province", "city_name", "city_code", "date") \
        .agg(F.count("windSpeed"), F.avg("windSpeed").alias("avg_windSpeed")) \
        .filter("count(windSpeed) = 4") \
        .sort(F.asc("avg_windSpeed")) \
        .select("province", "city_name", "city_code", "date",
                F.format_number('avg_windSpeed', 1).alias("avg_windSpeed"))
    df_avg_windSpeed.cache()
    avg_windSpeed_list = df_avg_windSpeed.collect()
    df_avg_windSpeed.coalesce(1).write.json("file:///home/hadoop/PythonCode/SparkAnalysis/passed_windSpeed_analyse.json")
    print("end analysing passed windSpeed")
    return avg_windSpeed_list[0:10]

五、数据可视化

数据可视化使用python matplotlib库。可使用pip命令安装。也可以pycharm直接安装。
在这里插入图片描述

绘制过程大体如下:
第一步,应当设置字体,这里提供了黑体的字体文件simhei.tff。否则坐标轴等出现中文的地方是乱码。
第二步,设置数据(累积雨量或者日平均气温)和横轴坐标(城市名称),配置直方图。
第三步,配置横轴坐标位置,设置纵轴坐标范围
第四步,配置横纵坐标标签
第五步,配置每个条形图上方显示的数据
第六步,根据上述配置,画出直方图。
画图部分对应的运行截图如下:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

保存的matplotlib作的图:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

六、数据以及源代码

1.爬取的数据截图:

city.csv
在这里插入图片描述

passed_weather_ALL.csv
在这里插入图片描述

province.csv
在这里插入图片描述

2.爬虫代码

# -*- coding: utf-8 -*-
import urllib.request, urllib.error, urllib.parse
import json
import csv
import os
import time
import random
import socket

class Crawler:
    def get_html(self, url):
        head = {
            "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36"
        }
        request = urllib.request.Request(url, headers=head)
        NET_STATUS = False
        while not NET_STATUS:
            try:
                # url = 'http://www.nmc.cn/f/rest/province'
                response = urllib.request.urlopen(request, timeout=5)
                html = response.read().decode("utf-8")
                return html
            except socket.timeout:
                print('NET_STATUS is not good')
                NET_STATUS = False

    # def get_html(self, url):  # 得到指定一个URL的网页内容
    #     head = {
    #         "User-Agent": "Mozilla/5.0(Windows NT 10.0;Win64;x64) AppleWebKit/537.36(KHTML, likeGecko) Chrome / 89.0.4389.90Safari / 537.36"
    #     }
    #     request = urllib.request.Request(url, headers=head)
    #     try:
    #         response = urllib.request.urlopen(request, timeout=5)
    #         html = response.read().decode("utf-8")
    #         # print(html)
    #     except urllib.error.URLError as e:
    #         if hasattr(e, "code"):
    #             print(e.code)
    #         if hasattr(e, "reason"):
    #             print(e.reason)
    #     return html

    def parse_json(self, url):
        obj = self.get_html(url)
        if obj:
            json_obj = json.loads(obj)
        else:
            json_obj = list()
        # print json_obj
        # for obj in json_obj:
        #     print obj
        # print chardet.detect(obj['name'])
        return json_obj
        # soup = BeautifulSoup(html_doc,"html.parser",from_encoding='utf-8')
        # links = soup.find_all('a')
        # print "all links"
        # for link in links:
        #     print link.name,link['href']

    def write_csv(self, file, data):
        if data:
            print("begin to write to " + file)
            with open(file, 'a+', newline='') as f:
                # f.write(codecs.BOM_UTF8)
                f_csv = csv.DictWriter(f, list(data[0].keys()))
                if not os.path.exists(file):
                    f_csv.writeheader()
                f_csv.writerows(data)
                # writerows()将一个二维列表中的每一个列表写为一行。
            print("end to write to " + file)

    def write_header(self, file, data):
        if data:
            print("begin to write to " + file)
            with open(file, 'a+', newline='') as f:
                # f.write(codecs.BOM_UTF8)
                f_csv = csv.DictWriter(f, list(data[0].keys()))
                f_csv.writeheader()
                f_csv.writerows(data)
            print("end to write to " + file)

    def write_row(self, file, data):
        if data:
            print("begin to write to " + file)
            with open(file, 'a+', newline='') as f:
                # f.write(codecs.BOM_UTF8)
                f_csv = csv.DictWriter(f, list(data[0].keys()))
                # f_csv.writeheader()
                f_csv.writerows(data)
            print("end to write to " + file)

    def read_csv(self, file):
        print("begin to read " + file)
        with open(file, 'r') as f:
            data = csv.DictReader(f)
            print("end to read " + file)
            return list(data)

    def get_provinces(self):
        province_file = 'input/province.csv'
        if not os.path.exists(province_file):  # 如果没有省份文件,开始爬取城市信息
            print("begin crawl province")
            provinces = self.parse_json('http://www.nmc.cn/f/rest/province')
            print("end crawl province")
            self.write_csv(province_file, provinces)
        else:
            provinces = self.read_csv(province_file)
        return provinces

    def get_cities(self):  # 获取城市
        city_file = 'input/city.csv'
        if not os.path.exists(city_file):  # 如果没有城市文件,开始爬取城市信息
            cities = list()
            print("begin crawl city")
            for province in self.get_provinces():  # 循环34个省份
                print(province['name'])
                url = province['url'].split('/')[-1].split('.')[0]
                cities.extend(self.parse_json('http://www.nmc.cn/f/rest/province/' + url))
            self.write_csv(city_file, cities)
            print("end crawl city")
        else:
            cities = self.read_csv(city_file)
        return cities

    def get_passed_weather(self, province):
        weather_passed_file = 'input/passed_weather_' + province + '.csv'
        if os.path.exists(weather_passed_file):
            return
        passed_weather = list()
        count = 0
        if province == 'ALL':
            print("begin crawl passed weather")  # 开始爬取历史天气
            for city in self.get_cities():
                print(city['province'] + ' ' + city['city'] + ' ' + city['code'])
                data = self.parse_json('http://www.nmc.cn/f/rest/passed/' + city['code'])
                if data:
                    count = count + 1
                    for item in data:
                        item['city_code'] = city['code']
                        item['province'] = city['province']
                        item['city_name'] = city['city']
                        item['city_index'] = str(count)
                    passed_weather.extend(data)
                # time.sleep(random.random())
                time.sleep(0.8)
                if count % 50 == 0:
                    if count == 50:
                        self.write_header(weather_passed_file, passed_weather)
                    else:
                        self.write_row(weather_passed_file, passed_weather)
                    passed_weather = list()
            if passed_weather:
                if count <= 50:
                    self.write_header(weather_passed_file, passed_weather)
                else:
                    self.write_row(weather_passed_file, passed_weather)
            print("end crawl passed weather")
        else:
            print("begin crawl passed weather")
            select_city = [x for x in self.get_cities() if x['province'] == province]
            for city in select_city:
                print(city['province'] + ' ' + city['city'] + ' ' + city['code'])
                data = self.parse_json('http://www.nmc.cn/f/rest/passed/' + city['code'])
                if data:
                    count = count + 1
                    for item in data:
                        item['city_index'] = str(count)
                        item['city_code'] = city['code']
                        item['province'] = city['province']
                        item['city_name'] = city['city']
                    passed_weather.extend(data)
                # time.sleep(1)
            self.write_csv(weather_passed_file, passed_weather)
            print("end crawl passed weather")

    def run(self, range='ALL'):
        self.get_passed_weather(range)

def main():
    crawler = Crawler()
    crawler.run("ALL")

if __name__ == '__main__':
    main()

3.Spark分析代码:

# coding:utf-8
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DecimalType, TimestampType
import matplotlib as mpl
import matplotlib.pyplot as plt
from matplotlib.font_manager import FontProperties
import os
import math
from Crawler import *

def passed_rain_analyse(filename):  # 计算各个城市过去24小时累积雨量
    print("begin to analyse passed rain")
    spark = SparkSession.builder.master("local").appName("passed_rain_analyse").getOrCreate()
    df = spark.read.csv(filename, header=True)
    df_rain = df.select(df['province'], df['city_name'], df['city_code'], df['rain1h'].cast(DecimalType(scale=1))) \
        .filter(df['rain1h'] < 1000)  # 筛选数据,去除无效数据
    df_rain_sum = df_rain.groupBy("province", "city_name", "city_code") \
        .agg(F.sum("rain1h").alias("rain24h")) \
        .sort(F.desc("rain24h"))  # 分组、求和、排序
    df_rain_sum.cache()
    df_rain_sum.coalesce(1).write.csv("file:///home/hadoop/PythonCode/SparkAnalysis/passed_rain_analyse.csv")
    print("end analysing passed rain")
    return df_rain_sum.head(20)

def passed_temperature_analyse(filename):
    print("begin to analyse passed temperature")
    spark = SparkSession.builder.master("local").appName("passed_temperature_analyse").getOrCreate()
    df = spark.read.csv(filename, header=True)
    df_temperature = df.select(  # 选择需要的列
        df['province'],
        df['city_name'],
        df['city_code'],
        df['temperature'].cast(DecimalType(scale=1)),
        F.date_format(df['time'], "yyyy-MM-dd").alias("date"),  # 得到日期数据
        F.hour(df['time']).alias("hour")  # 得到小时数据
    )
    # 筛选四点时次
    df_4point_temperature = df_temperature.filter(df_temperature['hour'].isin([2, 8, 12, 20]))
    # df_4point_temperature.printSchema()
    df_avg_temperature = df_4point_temperature.groupBy("province", "city_name", "city_code", "date") \
        .agg(F.count("temperature"), F.avg("temperature").alias("avg_temperature")) \
        .filter("count(temperature) = 4") \
        .sort(F.asc("avg_temperature")) \
        .select("province", "city_name", "city_code", "date",
                F.format_number('avg_temperature', 1).alias("avg_temperature"))
    df_avg_temperature.cache()
    avg_temperature_list = df_avg_temperature.collect()
    df_avg_temperature.coalesce(1).write.json("file:///home/hadoop/PythonCode/SparkAnalysis/passed_temperature_analyse.json")
    print("end analysing passed temperature")
    return avg_temperature_list[0:10]

def passed_humidity_analyse(filename):
    print("begin to analyse passed humidity")
    spark = SparkSession.builder.master("local").appName("passed_humidity_analyse").getOrCreate()
    df = spark.read.csv(filename, header=True)
    df_humidity = df.select(  # 选择需要的列
        df['province'],
        df['city_name'],
        df['city_code'],
        df['humidity'].cast(DecimalType(scale=1)),
        F.date_format(df['time'], "yyyy-MM-dd").alias("date"),  # 得到日期数据
        F.hour(df['time']).alias("hour")  # 得到小时数据
    )
    # 筛选四点时次
    df_4point_humidity = df_humidity.filter(df_humidity['hour'].isin([2, 8, 12, 20]))
    # df_4point_humidity.printSchema()
    df_avg_humidity = df_4point_humidity.groupBy("province", "city_name", "city_code", "date") \
        .agg(F.count("humidity"), F.avg("humidity").alias("avg_humidity")) \
        .filter("count(humidity) = 4") \
        .sort(F.asc("avg_humidity")) \
        .select("province", "city_name", "city_code", "date",
                F.format_number('avg_humidity', 1).alias("avg_humidity"))
    df_avg_humidity.cache()
    avg_humidity_list = df_avg_humidity.collect()
    df_avg_humidity.coalesce(1).write.json("file:///home/hadoop/PythonCode/SparkAnalysis/passed_humidity_analyse.json")
    print("end analysing passed analyse")
    return avg_humidity_list[0:10]

def passed_windSpeed_analyse(filename):
    print("begin to analyse passed windSpeed")
    spark = SparkSession.builder.master("local").appName("passed_windSpeed_analyse").getOrCreate()
    df = spark.read.csv(filename, header=True)
    df_windSpeed = df.select(  # 选择需要的列
        df['province'],
        df['city_name'],
        df['city_code'],
        df['windSpeed'].cast(DecimalType(scale=1)),
        F.date_format(df['time'], "yyyy-MM-dd").alias("date"),  # 得到日期数据
        F.hour(df['time']).alias("hour")  # 得到小时数据
    )
    # 筛选四点时次
    df_4point_windSpeed = df_windSpeed.filter(df_windSpeed['hour'].isin([2, 8, 12, 20]))
    # df_4point_windSpeed.printSchema()
    df_avg_windSpeed = df_4point_windSpeed.groupBy("province", "city_name", "city_code", "date") \
        .agg(F.count("windSpeed"), F.avg("windSpeed").alias("avg_windSpeed")) \
        .filter("count(windSpeed) = 4") \
        .sort(F.asc("avg_windSpeed")) \
        .select("province", "city_name", "city_code", "date",
                F.format_number('avg_windSpeed', 1).alias("avg_windSpeed"))
    df_avg_windSpeed.cache()
    avg_windSpeed_list = df_avg_windSpeed.collect()
    df_avg_windSpeed.coalesce(1).write.json("file:///home/hadoop/PythonCode/SparkAnalysis/passed_windSpeed_analyse.json")
    print("end analysing passed windSpeed")
    return avg_windSpeed_list[0:10]

def draw_rain(rain_list):
    print("begin to draw the picture of passed rain")
    font = FontProperties(fname='ttf/simhei.ttf')  # 设置字体
    name_list = []
    num_list = []
    for item in rain_list:
        name_list.append(item.province[0:2] + '\n' + item.city_name)
        num_list.append(item.rain24h)
    index = [i + 0.25 for i in range(0, len(num_list))]
    rects = plt.bar(index, num_list, color=['r', 'g', 'b', 'y'], width=0.5)
    plt.xticks([i + 0.25 for i in index], name_list, fontproperties=font,fontsize=20)
    plt.yticks(fontsize=20)
    plt.ylim(ymax=(int(max(num_list)+10) / 100) * 100, ymin=0)
    plt.xlabel("城市", fontproperties=font,fontsize=20)
    plt.ylabel("雨量", fontproperties=font,fontsize=20)
    plt.title("过去24小时累计降雨量全国前20名", fontproperties=font,fontsize=20)
    for rect in rects:
        height = rect.get_height()
        plt.text(rect.get_x() + rect.get_width() / 2, height + 1, str(height), ha="center", va="bottom",fontsize=20)
    plt.show()
    print("ending drawing the picture of passed rain")

def draw_temperature(temperature_list):
    print("begin to draw the picture of passed temperature")
    font = FontProperties(fname='ttf/simhei.ttf')
    name_list = []
    num_list = []
    date = temperature_list[0].date
    for item in temperature_list:
        name_list.append(item.province[0:2] + '\n' + item.city_name)
        # item.avg_temperature = item.avg_temperature.replace(',', '')
        num_list.append(float(item.avg_temperature))
        # num_list.append([float(x) for x in item.avg_temperature])
    index = [i + 0.25 for i in range(0, len(num_list))]
    rects = plt.bar(index, num_list, color=['r', 'g', 'b', 'y'], width=0.5)
    plt.xticks([i + 0.25 for i in index], name_list, fontproperties=font,fontsize=20)
    plt.yticks(fontsize=20)
    plt.ylim(ymax=math.ceil(float(max(num_list)))-10, ymin=0)
    plt.xlabel("城市", fontproperties=font,fontsize=20)
    plt.ylabel("日平均气温", fontproperties=font,fontsize=20)
    plt.title(date + "全国日平均气温最低前10名", fontproperties=font,fontsize=20)
    for rect in rects:
        height = rect.get_height()
        plt.text(rect.get_x() + rect.get_width() / 2, height + 0.1, str(height), ha="center", va="bottom",fontsize=20)
    plt.show()
    print("ending drawing the picture of passed temperature")

def draw_humidity(humidity_list):
    print("begin to draw the picture of passed humidity")
    font = FontProperties(fname='ttf/simhei.ttf')
    name_list = []
    num_list = []
    date = humidity_list[0].date
    for item in humidity_list:
        name_list.append(item.province[0:2] + '\n' + item.city_name)
        num_list.append(float(item.avg_humidity))
    index = [i + 0.25 for i in range(0, len(num_list))]
    rects = plt.bar(index, num_list, color=['r', 'g', 'b', 'y'], width=0.5)
    plt.xticks([i + 0.25 for i in index], name_list, fontproperties=font,fontsize=20)
    plt.yticks(fontsize=20)
    plt.ylim(ymax=math.ceil(float(max(num_list))), ymin=0)
    plt.xlabel("城市", fontproperties=font,fontsize=20)
    plt.ylabel("日平均湿度", fontproperties=font,fontsize=20)
    plt.title(date + "全国日平均湿度最低前10名", fontproperties=font,fontsize=20)
    for rect in rects:
        height = rect.get_height()
        plt.text(rect.get_x() + rect.get_width() / 2, height + 0.1, str(height), ha="center", va="bottom", fontsize=20)
    plt.show()
    print("ending drawing the picture of passed humidity")

def draw_windSpeed(windSpeed_list):
    print("begin to draw the picture of passed windSpeed")
    font = FontProperties(fname='ttf/simhei.ttf')
    name_list = []
    num_list = []
    date = windSpeed_list[0].date
    for item in windSpeed_list:
        name_list.append(item.province[0:2] + '\n' + item.city_name)
        num_list.append(float(item.avg_windSpeed))
    index = [i + 0.25 for i in range(0, len(num_list))]
    rects = plt.bar(index, num_list, color=['r', 'g', 'b', 'y'], width=0.5)
    plt.xticks([i + 0.25 for i in index], name_list, fontproperties=font,fontsize=20)
    plt.yticks(fontsize=20)
    plt.ylim(ymax=math.ceil(float(max(num_list))), ymin=0)
    plt.xlabel("城市", fontproperties=font,fontsize=20)
    plt.ylabel("日平均风速", fontproperties=font,fontsize=20)
    plt.title(date + "全国日平均风速最低前10名", fontproperties=font, fontsize=20)
    for rect in rects:
        height = rect.get_height()
        plt.text(rect.get_x() + rect.get_width() / 2, height + 0.1, str(height), ha="center", va="bottom", fontsize=20)
    plt.show()
    print("ending drawing the picture of passed windSpeed")

def main():
    sourcefile = "input/passed_weather_ALL.csv"
    if not os.path.exists(sourcefile):
        crawler = Crawler()
        crawler.run('ALL')
    # 过去24小时累计降雨量全国前20名
    rain_list = passed_rain_analyse('file:///home/hadoop/PythonCode/SparkAnalysis/' + sourcefile)
    draw_rain(rain_list)
    # 全国日平均气温最低前10名
    temperature_list = passed_temperature_analyse('file:///home/hadoop/PythonCode/SparkAnalysis/' + sourcefile)
    draw_temperature(temperature_list)
    # 全国日平均湿度最低前10名
    humidity_list = passed_humidity_analyse('file:///home/hadoop/PythonCode/SparkAnalysis/' + sourcefile)
    draw_humidity(humidity_list)
    # 全国日平均风速最低前10名
    windSpeed_list = passed_windSpeed_analyse('file:///home/hadoop/PythonCode/SparkAnalysis/' + sourcefile)
    draw_windSpeed(windSpeed_list)

if __name__ == '__main__':
    main()


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/465084.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

WebRTC:真正了解 RTP 和 RTCP

介绍 近年来&#xff0c;通过互联网进行实时通信变得越来越流行&#xff0c;而 WebRTC 已成为通过网络实现实时通信的领先技术之一。WebRTC 使用多种协议&#xff0c;包括实时传输协议 (RTP) 和实时控制协议 (RTCP)。 RTP负责通过网络传输音频和视频数据&#xff0c;而RTCP负责…

Uibot (RPA设计软件)RPA基础培训-财务会计Web应用自动化(批量开票机器人)

Uibot (RPA设计软件&#xff09;Mage AI智能识别&#xff08;发票识别&#xff09;———机器人的小项目友友们可以参考小北的课前材料五博客~ (本博客中会有部分课程ppt截屏,如有侵权请及请及时与小北我取得联系~&#xff09; 紧接着小北的前两篇博客&#xff0c;友友们我们…

【全面了解自然语言处理三大特征提取器】RNN(LSTM)、transformer(注意力机制)、CNN

目录 一 、RNN1.RNN单个cell的结构2.RNN工作原理3.RNN优缺点 二、LSTM1.LSTM单个cell的结构2. LSTM工作原理 三、transformer1 Encoder&#xff08;1&#xff09;position encoding&#xff08;2&#xff09;multi-head-attention&#xff08;3&#xff09;add&norm 残差链…

PyCharm实现一个简单的注册登录Django项目

之前已经实现了一个简单的Django项目&#xff0c;今天我们j基于之前的项目来实现注册、登录以及登录成功之后跳转到StuList页面。 1、连接数据库 1.1 配置数据库信息&#xff1a; 首先在myweb的settings.py 文件中设置MySQL数据库连接信息&#xff1a; DATABASES {default…

在线疫苗预约小程序|基于微信小程序的在线疫苗预约小程序设计与实现(源码+数据库+文档)

在线疫苗预约小程序目录 目录 基于微信小程序的在线疫苗预约小程序设计与实现 一、前言 二、系统设计 三、系统功能设计 1、疫苗管理 2、疫苗订单管理 3、论坛管理 4、公告管理 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源…

html5cssjs代码 022 表单输入类型示例

html5&css&js代码 022 表单输入类型示例 一、代码二、解释 这段HTML代码定义了一个网页&#xff0c;展示了表单输入类型示例。 一、代码 <!DOCTYPE html> <html lang"zh-cn"> <head><title>编程笔记 html5&css&js 表单输入…

SpringBoot整合JPA

一 运行效果如下 二 项目结构图 三 代码 pom.xml <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance&qu…

2024 年(第 12 届)“泰迪杯”数据挖掘挑战赛——A 题:生产线的故障自动识别与人员配置具体思路以及源代码分析

一、问题背景 随着新兴信息技术的大规模应用&#xff0c;工业生产线的智能化控制技术日益成熟。自动生产线 可以自动完成物品传送、物料填装、产品包装和质量检测等过程&#xff0c;极大地提高了生产效率和 产品质量&#xff0c;减少了生产成本。自动生产线融入故障智能报警…

【Spring Boot 源码学习】深入应用上下文初始化器实现

《Spring Boot 源码学习系列》 深入应用上下文初始化器实现 一、引言二、往期内容三、主要内容3.1 spring-boot 子模块中内置的实现类3.1.1 ConfigurationWarningsApplicationContextInitializer3.1.2 ContextIdApplicationContextInitializer3.1.3 DelegatingApplicationConte…

FFmpeg-aac、h264封装flv及时间转换

文章目录 时间概念流程api核心代码 时间概念 dts: 解码时间戳, 表示压缩帧的解码时间 pts: 显示时间戳, 表示将压缩帧解码后得到的原始帧的显示时间 时间基: time_base &#xff0c; 通常以ms为单位 时间戳: timestamp , 多少个时间基 真实时间&#xff1a;time_base * timest…

email + celery+django 异步发送邮件功能的实现

主要流程&#xff1a; django通过发件服务器到收件服务器&#xff0c;最后到收件人 邮件配置设置需要打开SMTP/IMAP并获的授权码&#xff0c;完成授权功能实现发送给收件人 邮件配置请参考另一博客https://blog.csdn.net/qq_44238024/article/details/136277821 项目结构树…

mac下Appuim环境安装

参考资料 Mac安装Appium_mac电脑安装appium-CSDN博客 安卓测试工具&#xff1a;Appium 环境安装&#xff08;mac版本&#xff09;_安卓自动化测试mac环境搭建-CSDN博客 1. 基本环境依赖 1 node.js 2 JDK&#xff08;Java JDK&#xff09; 3 Android SDK 4 Appium&#x…

数据分析 | Matplotlib

Matplotlib 是 Python 中常用的 2D 绘图库&#xff0c;它能轻松地将数据进行可视化&#xff0c;作出精美的图表。 绘制折线图&#xff1a; import matplotlib.pyplot as plt #时间 x[周一,周二,周三,周四,周五,周六,周日] #能量值 y[61,72,66,79,80,88,85] # 用来设置字体样式…

Linux进程管理:(六)SMP负载均衡

文章说明&#xff1a; Linux内核版本&#xff1a;5.0 架构&#xff1a;ARM64 参考资料及图片来源&#xff1a;《奔跑吧Linux内核》 Linux 5.0内核源码注释仓库地址&#xff1a; zhangzihengya/LinuxSourceCode_v5.0_study (github.com) 1. 前置知识 1.1 CPU管理位图 内核…

如何用Selenium通过Xpath,精准定位到“多个相同属性值以及多个相同元素”中的目标属性值

前言 本文是该专栏的第21篇,后面会持续分享python爬虫干货知识,记得关注。 相信很多同学,都有使用selenium来写爬虫项目或者自动化页面操作项目。同样,也相信很多同学在使用selenium来定位目标元素的时候,或多或少遇见到这样的情况,就是用Xpath定位目标元素的时候,页面…

Mysql主从之keepalive+MySQL高可用

一、Keepalived概述 keepalived 是集群管理中保证集群高可用的一个服务软件&#xff0c;用来防止单点故障。 keepalived 是以VRRP 协议为实现基础的&#xff0c;VRRP 全称VirtualRouter Redundancy Protocol&#xff0c;即虚拟路由冗余协议。虚拟路由冗余协议&#xff0c;可以…

launchctl及其配置、使用、示例

文章目录 launchctl 是什么Unix / Linux类似的工具有什么哪个更常用配置使用常用子命令示例加载一个 launch agent:卸载一个 launch daemon:列出所有已加载的服务:启动一个服务:停止一个服务:禁用一个服务:启用一个服务: 附com.example.myagent.plist内容有趣的例子参考 launch…

力扣L15--- 67.二进制求和(JAVA版)-2024年3月17日

1.题目描述 2.知识点 注1&#xff1a; 二进制用 %2 /2 3.思路和例子 采用竖位相加的方法 4.代码实现 class Solution {public String addBinary(String a, String b) {StringBuilder sbnew StringBuilder();int ia.length()-1;int jb.length()-1;int jinwei0;int digit1,d…

快速排序(数据结构)

1. 前言&#xff1a; 这两种排序经常使用&#xff0c;且在算法题中经常遇见。 这里我们简单分析讨论一下。 1. 快速排序 平均时间复杂度&#xff1a;O&#xff08;nlogn&#xff09; 最坏时间复杂度&#xff1a; O&#xff08;n^2&#xff09; 1.1. 左右向中遍历: 取最右侧4…

Multiplicity - 用一个键盘和鼠标控制多台电脑

Multiplicity 是一款用于多台电脑间控制的软件。通过这个工具&#xff0c;用户可以轻松地在多个计算机之间共享剪贴板、鼠标、键盘和显示屏幕。这样&#xff0c;无需每台电脑之间频繁切换&#xff0c;工作效率也会大大提高。 特征 远程PC访问 无缝控制过渡 兼容所有显示类型…