文章目录
- 0 前言
- 1 课题背景
- 2 实现效果
- 3 数据收集分析过程
- **总体框架图**
- **kafka 创建日志主题**
- **flume 收集日志写到 kafka**
- **python 读取 kafka 实时处理**
- **数据分析可视化**
- 4 Flask框架
- 5 最后
0 前言
🔥 这两年开始毕业设计和毕业答辩的要求和难度不断提升,传统的毕设题目缺少创新和亮点,往往达不到毕业答辩的要求,这两年不断有学弟学妹告诉学长自己做的项目系统达不到老师的要求。
为了大家能够顺利以及最少的精力通过毕设,学长分享优质毕业设计项目,今天要分享的是
🚩 基于大数据的服务器数据分析与可视化系统
🥇学长这里给一个题目综合评分(每项满分5分)
- 难度系数:3分
- 工作量:5分
- 创新点:3分
- 界面美化:5分
1 课题背景
基于python的nginx大数据日志分析可视化,通过流、批两种方式,分析 nginx 日志,将分析结果通过 flask + echarts 进行可视化展示
2 实现效果
24 小时访问趋势
每日访问情况
客户端设备占比
用户分布
爬虫词云
3 数据收集分析过程
总体框架图
kafka 创建日志主题
# 创建主题
kafka-topics --bootstrap-server gfdatanode01:9092 --create --replication-factor 3 --partitions 1 --topic nginxlog
flume 收集日志写到 kafka
创建 flume 到 kafka 的配置文件 flume_kafka.conf,配置如下
a1.sources = s1
a1.channels = c1
a1.sinks = k1
a1.sources.s1.type=exec
a1.sources.s1.command=tail -f /var/log/nginx/access.log
a1.sources.s1.channels=c1
#设置Kafka接收器
a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
#设置Kafka地址
a1.sinks.k1.brokerList=172.16.122.23:9092
#设置发送到Kafka上的主题
a1.sinks.k1.topic=nginxlog
#设置序列化方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
a1.sinks.k1.channel=c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=100
启动 flume
flume-ng agent -n a1 -f flume_kafka.conf
python 读取 kafka 实时处理
通过 python 实时处理 nginx 的每一条日志数据,然后写到 mysql 。
from kafka import KafkaConsumer
servers = ['172.16.122.23:9092', ]
consumer = KafkaConsumer(
bootstrap_servers=servers,
auto_offset_reset='latest', # 重置偏移量 earliest移到最早的可用消息,latest最新的消息,默认为latest
)
consumer.subscribe(topics=['nginxlog'])
for msg in consumer:
info = re.findall('(.*?) - (.*?) \[(.*?)\] "(.*?)" (\\d+) (\\d+) "(.*?)" "(.*?)" .*', msg.value.decode())
log = NginxLog(*info[0])
log.save()
数据分析可视化
-- 用户分布
select province, count(distinct remote_addr) from fact_nginx_log where device <> 'Spider' group by province;
-- 不同时段访问情况
select case when device='Spider' then 'Spider' else 'Normal' end, hour(time_local), count(1)
from fact_nginx_log
group by case when device='Spider' then 'Spider' else 'Normal' end, hour(time_local);
-- 最近7天访问情况
select case when device='Spider' then 'Spider' else 'Normal' end, DATE_FORMAT(time_local, '%Y%m%d'), count(1)
from fact_nginx_log
where time_local > date_add(CURRENT_DATE, interval - 7 day)
group by case when device='Spider' then 'Spider' else 'Normal' end, DATE_FORMAT(time_local, '%Y%m%d');
-- 用户端前10的设备
select device, count(1)
from fact_nginx_log
where device not in ('Other', 'Spider') -- 过滤掉干扰数据
group by device
order by 2 desc
limit 10
-- 搜索引擎爬虫情况
select browser, count(1) from fact_nginx_log where device = 'Spider' group by browser;
最后,通过 pandas 读取 mysql,经 ironman 进行可视化展示。
4 Flask框架
简介
Flask是一个基于Werkzeug和Jinja2的轻量级Web应用程序框架。与其他同类型框架相比,Flask的灵活性、轻便性和安全性更高,而且容易上手,它可以与MVC模式很好地结合进行开发。Flask也有强大的定制性,开发者可以依据实际需要增加相应的功能,在实现丰富的功能和扩展的同时能够保证核心功能的简单。Flask丰富的插件库能够让用户实现网站定制的个性化,从而开发出功能强大的网站。
本项目在Flask开发后端时,前端请求会遇到跨域的问题,解决该问题有修改数据类型为jsonp,采用GET方法,或者在Flask端加上响应头等方式,在此使用安装Flask-CORS库的方式解决跨域问题。此外需要安装请求库axios。
Flask框架图
相关代码
import os
import sys
sys.path.append(os.path.dirname(os.path.abspath(__file__)) + '/../')
from flask import Flask, render_template
from ironman.data import SourceData
from ironman.data_db import SourceData
app = Flask(__name__)
source = SourceData()
@app.route('/')
def index():
return render_template('index.html')
@app.route('/line')
def line():
data = source.line
xAxis = data.pop('legend')
return render_template('line.html', title='24小时访问趋势', data=data, legend=list(data.keys()), xAxis=xAxis)
@app.route('/bar')
def bar():
data = source.bar
xAxis = data.pop('legend')
return render_template('bar.html', title='每日访问情况', data=data, legend=list(data.keys()), xAxis=xAxis)
@app.route('/pie')
def pie():
data = source.pie
return render_template('pie.html', title='客户端设备占比', data=data, legend=[i.get('name') for i in data])
@app.route('/china')
def china():
data = source.china
return render_template('china.html', title='用户分布', data=data)
@app.route('/wordcloud')
def wordcloud():
data = source.wordcloud
return render_template('wordcloud.html', title='爬虫词云', data=data)
if __name__ == "__main__":
app.run(host='127.0.0.1', debug=True)