day09_实时类标签/指标

文章目录

  • day09_实时类标签/指标
  • 一、日志数据实时采集
    • 2、Flume简介
      • 2.3 项目日志数据采集Flume配置
        • 2.3.1 涉及的Flume组件和参数
        • 2.3.2 Nginx日志采集
        • 2.3.3 用户行为日志采集
  • 二、Nginx日志数据统计
    • 1、日志格式说明
    • 2、数据ETL
      • 2.1 日志抽取
        • 2.1.1 正则表达式
        • 2.1.2 基于Spark实现Nginx数据匹配
      • 2.2 字段解析
        • 2.2.1 日期格式转换
        • 2.2.2 IP解析地理位置(了解)
        • 2.2.3 UA解析
      • 2.3 完整代码
      • 2.4 使用Hive读取HDFS数据
    • 3、指标统计
    • 1、尝试进行用户行为日志的数据ETL、指标统计

day09_实时类标签/指标

在这里插入图片描述
在这里插入图片描述

一、日志数据实时采集

2、Flume简介

2.3 项目日志数据采集Flume配置

zookeeper、Kafka的启动命令

启动zookeeper(没有启动的,才需要执行)
/export/server/zookeeper/bin/zkServer.sh start

启动Kafka
cd /export/server/kafka/bin
nohup ./kafka-server-start.sh ../config/server.sql 2>&1 &

Kafka其他的相关命令
cd /export/server/kafka/bin
查看当前集群有哪些Topic
./kafka-topics.sh --list --bootstrap-server up01:9092
新建Topic(分区数没要求,副本数<=broker节点个数)
./kafka-topics.sh --create --bootstrap-server up01:9092 --topic xtzg_nginx_log
参看Topic的详细信息
./kafka-topics.sh --describe --bootstrap-server up01:9092 --topic xtzg_nginx_log

注意: 要提前创建好Kafka的Topic
2.3.1 涉及的Flume组件和参数
  • source
type: 类型,固定值TAILDIR。能同时监控一个目录或者多个文件,也能动态监控每个文件的变化,还支持断点续传,不会出现重复消费问题。
fiilegroups:  以空格分隔的文件组列表。每个文件组表示一组要跟踪的文件。
filegroups.<filegroupName>: 文件组的绝对路径。正则表达式(而不是文件系统模式)只能用于文件名。
positionFile: JSON格式的文件,记录每个文件的inode、绝对路径和最后位置。

注意: type的TAILDIR大小写不能随便写
  • channel
type: 类型,固定值 org.apache.flume.channel.kafka.KafkaChannel
kafka.bootstrap.servers: Kafka集群中的broker列表。格式:hostname:port,多个用逗号隔开。
kafka.topic: channel要用的topic
parseAsFlumeEvent: 是否需要对采集到的数据解析为Event对象,然后在内容前面增加topic前缀,会导致后续的内容会有部分缺失的情况。一般是false

补充:

如果采集到的数据最终想要输出到Kafka中,可以直接选择使用Kafka Channel。
注意: Kafka Channel和Kafka Sink,虽然都是将数据输出到Kafka中,但是两者的配置参数有区别
2.3.2 Nginx日志采集

在这里插入图片描述

  • 创建nginx_to_kafka.conf文件

在这里插入图片描述

  • nginx_to_kafka.conf配置文件内容如下
#定义组件
a1.sources = r1
a1.channels = c1

#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /export/data/workspace/user_profile/log_generate/datacollection/source_data/access-nginx.*
a1.sources.r1.positionFile = /export/data/flume/nginx_position.json


#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = up01:9092
a1.channels.c1.kafka.topic = xtzg_nginx_log
a1.channels.c1.parseAsFlumeEvent = false

#组装 
a1.sources.r1.channels = c1

注意: 
	1- a1.sources.r1.filegroups.f1该参数值要改成你自己的路径
	2- 文件的模糊匹配的正则表达式中写的是.*表示匹配任意内容
将上面的配置文件复制到/export/server/flume/conf
cp /export/data/workspace/user_profile/scripts/flume/nginx_to_kafka.conf /export/server/flume/conf
  • 在Kafka上创建topic(前提开启zk,kafka)
cd /export/server/kafka/bin

./kafka-topics.sh --create --bootstrap-server up01:9092 --topic xtzg_nginx_log
  • 启动Flume
cd /export/server/flume

bin/flume-ng agent -n a1 -c conf/ -f conf/nginx_to_kafka.conf
  • 查看Kafka中的数据
cd /export/server/kafka/bin

./kafka-console-consumer.sh --bootstrap-server up01:9092 --topic xtzg_nginx_log
  • 启动

运行python中的NginxLogSimulationData.py。查看kafka中数据变化,如果看到新增数据则配置成功。确认无误后关停Flume采集任务。

2.3.3 用户行为日志采集
  • 创建user_event_to_kafka.conf文件
    在这里插入图片描述

  • user_event_to_kafka.conf配置文件内容如下

#定义组件
a1.sources = r1
a1.channels = c1

#配置source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /export/data/workspace/user_profile/log_generate/datacollection/source_data/user-event.*
a1.sources.r1.positionFile = /export/data/flume/user_event_position.json


#配置channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = up01:9092
a1.channels.c1.kafka.topic = xtzg_user_event
a1.channels.c1.parseAsFlumeEvent = false

#组装 
a1.sources.r1.channels = c1
  • 在Kafka上创建topic(前提开启zk,kafka)
cd /export/server/kafka

bin/kafka-topics.sh --create --bootstrap-server up01:9092 --topic xtzg_user_event --partitions 1 --replication-factor 1
  • 启动Flume
cd /export/server/flume

bin/flume-ng agent -n a1 -c conf/ -f conf/user_event_to_kafka.conf
  • 查看Kafka中的数据
cd /export/server/kafka

bin/kafka-console-consumer.sh --bootstrap-server up01:9092 --from-beginning --topic xtzg_user_event
  • 启动

运行python中的EventSimulationJsonData.py。查看kafka中数据变化,如果看到新增数据则配置成功。确认无误后关停Flume采集任务。

二、Nginx日志数据统计

1、日志格式说明

​ Nginx(发音 恩几可使)是异步框架的网页服务器,也可以用作反向代理、负载平衡器和HTTP缓存。该软件由俄罗斯程序员伊戈尔·赛索耶夫(Игорь Сысоев)开发并于2004年首次公开发布

  • Nginx日志包含access_logerror_log两种类型日志数据。项目中分析的数据为:access_log
  • Nginx开源官网:https://nginx.org/
  • 项目采集Nginx数据格式。以下为一条Nginx日志:
116.85.48.25 - - [12/Nov/2024:11:36:46 +0800] "GET /login.html HTTP/1.1" 404 729 "https://xtx.itcast.cn/referAFriend.html" "Mozilla/5.0 (iPhone; CPU iPhone OS 14_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/7.0.18(0x17001233) NetType/WIFI Language/zh_CN" "-"


Nginx日志格式说明:
	116.85.48.25: 用户访问IP地址
	- - : 用户标识(cookie信息)
	[14/Jul/2022:17:40:41 +0800]:  访问时间 + 时区
	GET : 请求方式
	/css/40.30d6d2b.css: 请求资源
	HTTP/1.1 : 请求的协议
	500 : 请求的状态码 (500 服务器错误,  200 成功  302 重定向  404 访问到未知资源)
	951 : 响应返回的字节大小
	"https://www.htv.com/official/component?WT.mc_id=3" : 来源的URL(从那个地方跳转到此页面)
	"Mozilla/5......:  浏览器标识

2、数据ETL

2.1 日志抽取

2.1.1 正则表达式
Java版本:
(?<ip>\d+\.\d+\.\d+\.\d+) (- - \[)(?<datetime>[\s\S]+)(?<t1>\][\s"]+)(?<request>[A-Z]+) (?<url>[\S]*) (?<protocol>[\S]+)["] (?<code>\d+) (?<sendbytes>\d+) ["](?<refferer>[\S]*) ["](?<useragent>[\S\s]+)["] ["](?<proxyaddr>[\S\s]+)["]

Python版本:
(?P<ip>.*?) - - \[(?P<time>.*?)\] "(?P<request>.*?)" (?P<status>.*?) (?P<bytes>.*?) "(?P<referer>.*?)" "(?P<ua>.*?)" "(?P<proxy_address>.*)"
2.1.2 基于Spark实现Nginx数据匹配

代码实现:

from pyspark.sql import SparkSession
import os
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .appName("nginx_etl")\
        .master("local[*]")\
        .config("spark.sql.shuffle.partitions",2)\
        .getOrCreate()

    # 2- 数据输入:读取Kafka中的数据
    """
        "startingOffsets","earliest":该配置,在实际工作中一般不需要配置。这里是为了开发代码方便
    """
    init_df = spark.readStream.format("kafka")\
        .option("kafka.bootstrap.servers","192.168.88.166:9092")\
        .option("subscribe","xtzg_nginx_log")\
        .option("startingOffsets","earliest")\
        .load()

    # 结构化流中不能以show()方式打印数据数据内容
    # init_df.show()

    # 3- 数据ETL处理
    # 3.1- value字段解码的操作
    """
        cast(StringType()):将字段数据类型强制转换为字符串。等同于SQL语句中的cast(value as string)
        
        下面两种方式都可以,推荐使用第一种,因为性能更好
    """
    # type_cast_df = init_df.select(init_df.value.cast(StringType()).alias("value"))
    type_cast_df = init_df.selectExpr("cast(value as string) as value")

    # 3.2- 通过正则表达式提取Nginx的字段
    pattern = '(?<ip>\d+\.\d+\.\d+\.\d+) (- - \[)(?<datetime>[\s\S]+)(?<t1>\][\s"]+)(?<request>[A-Z]+) (?<url>[\S]*) (?<protocol>[\S]+)["] (?<code>\d+) (?<sendbytes>\d+) ["](?<refferer>[\S]*) ["](?<useragent>[\S\s]+)["] ["](?<proxyaddr>[\S\s]+)["]'
    regexp_df = type_cast_df.select(
        F.regexp_extract("value",pattern,1).alias("ip"),
        F.regexp_extract("value",pattern,3).alias("datetime"),
        F.regexp_extract("value",pattern,4).alias("t1"),
        F.regexp_extract("value",pattern,5).alias("request"),
        F.regexp_extract("value",pattern,6).alias("url"),
        F.regexp_extract("value",pattern,7).alias("protocol"),
        F.regexp_extract("value",pattern,8).alias("code"),
        F.regexp_extract("value",pattern,9).alias("sendbytes"),
        F.regexp_extract("value",pattern,10).alias("refferer"),
        F.regexp_extract("value",pattern,11).alias("useragent"),
        F.regexp_extract("value",pattern,12).alias("proxyaddr")
    )

    # 4- 数据输出,启动流式任务
    regexp_df.writeStream.format("console").outputMode("append").start().awaitTermination()

运行结果截图:
在这里插入图片描述

可能遇到的错误:
在这里插入图片描述

原因: regexp_extract函数只能传递Java版的正则表达式,不能用Python的

2.2 字段解析

需求:根据nginx日志,ip标识唯一的用户,需要ip分组,统计得到用户访问的pv、uv、区域、状态码、终端设备的操作系统、设备品牌、浏览器、访问时间(年-月-日 时:分:秒)

2.2.1 日期格式转换

Python的datetime函数库

  • 相关函数:
    • strftime(): 把日期对象转成指定的时间格式的字符串
    • strptime(): 把指定格式的日期字符串转换为日期对象
  • 参考文档: https://docs.python.org/zh-cn/3/library/datetime.html#strftime-strptime-behavior
  • 解析格式: %d/%b/%Y:%H:%M:%S %z => %Y-%m-%d %H:%M:%S
    • 28/Jul/2022:16:22:07 +0800 => 日期对象 => 2022-07-28 16:22:07
  • 测试代码

Python方式

from datetime import datetime

if __name__ == '__main__':
    date_str = "11/Feb/2025:14:34:49 +0800"

    print(datetime.strptime(date_str, "%d/%b/%Y:%H:%M:%S %z").strftime("%Y-%m-%d %H:%M:%S"))

SparkSQL方式(重点掌握)

regexp_df.withColumn("datetime",F.from_unixtime(F.unix_timestamp("datetime","dd/MMM/yyyy:HH:mm:ss Z"),"yyyy-MM-dd HH:mm:ss"))

在这里插入图片描述

2.2.2 IP解析地理位置(了解)

根据IP解析地理位置

  • 方式一: 使用ip解析地理位置API
    • ip地址:http://opendata.baidu.com/api.php?query=117.136.12.79&co=&resource_id=6006&oe=utf8
    • 像百度地图开发平台 / 高德地图开放平台 … 都会提供IP解析的服务接口
    • 百度地图:https://lbs.baidu.com/faq/api?title=webapi/ip-api-base
    • 高德地图:https://lbs.amap.com/api/webservice/guide/api/ipconfig
    • 其他平台:https://www.nowapi.com/
  • 方式二: (了解)使用geo_ip依赖包和GeoLite2-City.mmdb库
    • 依赖包:geoip2~=4.5.0
    • 下载地址:https://gitcode.com/crownp/geolite2_demo/blob/master/src/main/resources/GeoLite2-City.mmdb
  • IP在线解析测试代码

Python的Requests库的介绍:https://requests.readthedocs.io/en/latest/

#!/usr/bin/env python
# @desc : 
__coding__ = "utf-8"
__author__ = "bytedance"

import requests

def parse_ip(ip_str):
    params = {
        "query": ip_str,
        "co": "",
        "resource_id": "6006",
        "oe": "utf8",
    }
    # 发送请求
    response = requests.get(url="https://opendata.baidu.com/api.php", params=params)
    # 解析响应内容
    result = response.json()

    status = result['status']
    if status == '0':
        # 正常
        try:
            return result['data'][0]['location'].split(" ")[0]
        except:
            return "未知区域"
    else:
        return "未知区域"


if __name__ == '__main__':
    ip_str = "127.0.0.1"
    ip_str = "10.254.1.97"
    ip_str = "157.148.69.76"

    area = parse_ip(ip_str)

    print(area)
2.2.3 UA解析

UA说明

  • UA为useragent简称,特指用户访问系统使用的客户端信息,一般包含操作系统,浏览器,设备品牌信息等
  • UA字符串信息:http://useragentstring.com/
  • 使用,需导入UA解析依赖包:from user_agents import parse
  • UA的作用
    • 1.客户端识别:通过User-Agent,服务器能够识别客户端的类型和版本,从而提供相应的内容和服务。比如,在移动设备上展示适合屏幕大小的网页布局,或在不同浏览器上提供兼容性优化。
    • 2.统计分析:网站和应用开发者可以利用User-Agent来收集客户端的信息,进行用户行为分析和统计。这有助于了解用户使用的设备和偏好,以便进行产品和服务的改进。
    • 3.安全性:User-Agent也可以用于安全验证和防止恶意行为。通过分析User-Agent,服务器可以检测到异常或伪造的请求,并采取相应的安全措施。
  • 测试代码:
from user_agents import parse

if __name__ == '__main__':
    ua_str = "Mozilla/5.0 (iPhone; CPU iPhone OS 13_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0 MQQBrowser/11.0.7 Mobile/15E148 Safari/604.1 QBWebViewUA/2 QBWebViewType/1 WKType/1"

    result = parse(ua_str)

    # os操作系统信息
    print("os----------")
    print(result.os.family)
    print(result.os.version)
    print(result.os.version_string)

    # brower浏览器信息
    print("browser----------")
    print(result.browser.family)
    print(result.browser.version)
    print(result.browser.version_string)

    # device设备信息
    print("device----------")
    print(result.device.family)
    print(result.device.model)

2.3 完整代码

需要将结果数据同时写入到Kafka和HDFS。清洗后的日志,可以用于其他业务分析,具有一定的价值。因为Kafka不能永久保存数据,所以需要把数据存储到HDFS一份。

因为每天都有很多日志,所以需要对日志进行分区。可以通过partitionBy()方法进行分区写入到HDFS。分区的字段需要进行计算。

另外,为了减少小文件生成,可以使用trigger来指定写入的时间间隔。

  • 先创建Kafka的Topic
cd /export/server/kafka/bin
./kafka-topics.sh --create --bootstrap-server up01:9092 --topic dwd_nginx_etl_result
  • 完整代码
from pyspark.sql import SparkSession
import os
import pyspark.sql.functions as F
from pyspark.sql.types import StringType, MapType
import requests
from user_agents import parse

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .appName("nginx_etl")\
        .master("local[*]")\
        .config("spark.sql.shuffle.partitions",2)\
        .getOrCreate()

    # 配置checkpointLocation路径,推荐使用HDFS路径
    spark.conf.set("spark.sql.streaming.checkpointLocation", "hdfs://192.168.88.166:8020/xtzg/chk")

    # 2- 数据输入:读取Kafka中的数据
    """
        "startingOffsets","earliest":该配置,在实际工作中一般不需要配置。这里是为了开发代码方便
    """
    init_df = spark.readStream.format("kafka")\
        .option("kafka.bootstrap.servers","192.168.88.166:9092")\
        .option("subscribe","xtzg_nginx_log")\
        .option("startingOffsets","earliest")\
        .load()

    # 结构化流中不能以show()方式打印数据数据内容
    # init_df.show()

    # 3- 数据ETL处理
    # 3.1- value字段解码的操作
    """
        cast(StringType()):将字段数据类型强制转换为字符串。等同于SQL语句中的cast(value as string)
        
        下面两种方式都可以,推荐使用第一种,因为性能更好
    """
    # type_cast_df = init_df.select(init_df.value.cast(StringType()).alias("value"))
    type_cast_df = init_df.selectExpr("cast(value as string) as value")

    # 3.2- 通过正则表达式提取Nginx的字段
    pattern = '(?<ip>\d+\.\d+\.\d+\.\d+) (- - \[)(?<datetime>[\s\S]+)(?<t1>\][\s"]+)(?<request>[A-Z]+) (?<url>[\S]*) (?<protocol>[\S]+)["] (?<code>\d+) (?<sendbytes>\d+) ["](?<refferer>[\S]*) ["](?<useragent>[\S\s]+)["] ["](?<proxyaddr>[\S\s]+)["]'

    # 这里不允许使用Python正则表达式,只能使用Java正则表达式
    # pattern = '(?P<ip>.*?) - - \[(?P<time>.*?)\] "(?P<request>.*?)" (?P<status>.*?) (?P<bytes>.*?) "(?P<referer>.*?)" "(?P<ua>.*?)" "(?P<proxy_address>.*)"'
    regexp_df = type_cast_df.select(
        F.regexp_extract("value",pattern,1).alias("ip"),
        F.regexp_extract("value",pattern,3).alias("datetime"),
        F.regexp_extract("value",pattern,4).alias("t1"),
        F.regexp_extract("value",pattern,5).alias("request"),
        F.regexp_extract("value",pattern,6).alias("url"),
        F.regexp_extract("value",pattern,7).alias("protocol"),
        F.regexp_extract("value",pattern,8).alias("code"),
        F.regexp_extract("value",pattern,9).alias("sendbytes"),
        F.regexp_extract("value",pattern,10).alias("refferer"),
        F.regexp_extract("value",pattern,11).alias("useragent"),
        F.regexp_extract("value",pattern,12).alias("proxyaddr")
    )

    # 3.3- 日期时间格式转换
    datetime_df = regexp_df.withColumn("datetime",F.from_unixtime(F.unix_timestamp("datetime","dd/MMM/yyyy:HH:mm:ss Z"),"yyyy-MM-dd HH:mm:ss"))

    # 3.4- IP地理位置解析
    @F.udf(returnType=StringType())
    def parse_ip(ip_str):
        params = {
            "query": ip_str,
            "co": "",
            "resource_id": "6006",
            "oe": "utf8",
        }
        # 发送请求
        response = requests.get(url="https://opendata.baidu.com/api.php", params=params)
        # 解析响应内容
        result = response.json()

        status = result['status']
        if status == '0':
            # 正常
            try:
                return result['data'][0]['location'].split(" ")[0]
            except:
                return "未知区域"
        else:
            return "未知区域"

    area_df = datetime_df.withColumn("area",parse_ip("ip"))

    # 3.5- UA解析
    """
        为什么这里用户自定义函数推荐返回字典?
        方便后续取值
    """
    @F.udf(returnType=MapType(keyType=StringType(),valueType=StringType()))
    def parse_ua(ua_str):
        result = parse(ua_str)
        os = result.os.family
        browser = result.browser.family
        device = result.device.model
        return {"os":os,"browser":browser,"device":device}

    ua_df = area_df.withColumn("os",parse_ua("useragent")['os'])\
        .withColumn("browser", parse_ua("useragent")['browser'])\
        .withColumn("device", parse_ua("useragent")['device'])

    # 4- 数据输出,启动流式任务
    # 4.1- 输出到HDFS
    # 新增一个分区字段
    dt_df = ua_df.withColumn("dt",F.split("datetime"," ")[0])

    # partitionBy表示按照哪个字段进行分区
    dt_df.writeStream.format("orc").partitionBy("dt")\
        .option("path","hdfs://192.168.88.166:8020/xtzg/etl/dwd_nginx_etl_result")\
        .start()

    # 4.2- 输出到Kafka
    # 注意:一般将数据内容转换为JSON格式输出到Kafka中,为了后续使用方便
    # 注意:输出到Kafka中的字段名称只能叫value
    kafka_df = ua_df.select(
        F.to_json(F.struct("ip","datetime","t1","request","url","protocol","code","sendbytes","refferer","useragent","proxyaddr","area","os","browser","device")).alias("value")
    )

    kafka_df.writeStream.format("kafka")\
        .option("kafka.bootstrap.servers","192.168.88.166:9092")\
        .option("topic","dwd_nginx_etl_result")\
        .start()

    # 4.3- 输出到控制台(为了测试)
    # awaitTermination()只能加在最后一个start()的后面
    dt_df.writeStream.format("console").outputMode("append").start().awaitTermination()

可能遇到的错误一:
在这里插入图片描述

原因: 结构化流中将数据输出到文件系统中,需要配置checkpointLocation

可能遇到的错误二:
在这里插入图片描述

原因: 输出到Kafka中的字段名称只能叫value

2.4 使用Hive读取HDFS数据

  • 创建表
CREATE external TABLE dwd.dwd_nginx_etl_result (
    ip string,
    `datetime` string,
    t1 string,
    request string,
    url string,
    protocol string,
    code string,
    sendbytes string,
    refferer string,
    useragent string,
    proxyaddr string,
    area string,
    os string,
    browser string,
    device string
)
    COMMENT 'nginx日志'
    PARTITIONED BY (dt string)
    STORED AS ORC
    LOCATION '/xtzg/etl/dwd_nginx_etl_result'
    TBLsql ('orc.compress' = 'SNAPPY')
;
  • 同步分区
MSCK REPAIR TABLE dwd.dwd_nginx_etl_result;

3、指标统计

  • 需求
统计实时请求总数(pv)
统计用户数(uv)
统计用户访问所在区域省(类似抖音的位置显示)
统计用户响应状态码
统计用户使用设备终端信息
统计用户操作系统信息
统计用户首次访问系统的时间
统计用户末次访问系统的时间

ip: 用户访问系统的唯一地址
pv:访问系统的页面次数
uv:访问系统的用户数
area:访问系统用户来自的区域,根据ip解析出地址位置
status_code:访问系统用户请求http协议响应状态码
device_os:设备终端,从ua中提取手机或电脑的系统
device_brand:设备品牌名称,从ua中提取手机或电脑的品牌
browser_name:访问系统用户使用的浏览器名称
first_access_time:用户首次访问系统的时间
last_access_time:用户首次访问系统的时间
  • Doris建表语句

使用unique模型。

CREATE DATABASE IF NOT EXISTS log_analysis_db;
CREATE TABLE IF NOT EXISTS log_analysis_db.nginx_log_result
(
    ip varchar(15) comment 'ip地址',
    pv int comment 'pv数',
    uv int comment 'uv数',
    area varchar(50) comment '用户所在区域,根据ip解析',
    status_code varchar(10) comment '请求响应状态码',
    device_os varchar(50) comment '设备系统,从ua中提取手机或电脑使用的系统',
    device_brand varchar(50) comment ',从ua中提取手机或电脑的品牌',
    browser_name varchar(50) comment '电脑和手机,使用浏览器,记录浏览器简称',
    first_access_time datetime comment 'nginx日志记录首次访问时间',
    last_access_time datetime comment 'nginx日志记录末次访问时间'
)
UNIQUE KEY(ip)
DISTRIBUTED BY HASH(ip) BUCKETS 10
sql("replication_num" = "1");
  • 完整代码
from pyspark.sql import SparkSession, DataFrame
import os
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder \
        .appName("nginx_analysis") \
        .master("local[*]") \
        .config("spark.sql.shuffle.partitions", 2) \
        .getOrCreate()

    # 2- 数据输入:读取Kafka中的数据
    init_df = spark.readStream.format("kafka") \
        .option("kafka.bootstrap.servers", "192.168.88.166:9092") \
        .option("subscribe", "dwd_nginx_etl_result") \
        .option("startingOffsets", "earliest") \
        .load()

    # 3- 数据处理
    # 3.1- value字段类型转换
    type_cast_df = init_df.select(init_df.value.cast(StringType()).alias("value"))

    # 3.2- 从JSON中提取一个个字段
    """
        json_tuple与get_json_object的区别
            get_json_object:
                优点:同时能够解析嵌套的JSON
                缺点:一次只能得到一个字段
            json_tuple:
                优点:一次能得到多个字段
                缺点:针对嵌套JSON,只能一层层解析
    """
    parse_json_df = type_cast_df.select(
        F.json_tuple("value","ip","datetime","code","area","os","browser","device")\
            .alias("ip","datetime","status_code","area","device_os","browser_name","device_brand")
    )

    # 3.3- 指标统计
    # F.lit(1)生成一列,每行的数据内容一样,全都是1。与F.col函数作用类似
    # 因为类似area的这些字段的数据类型是字符串,聚合函数没有太适合的,因此使用first
    result_df = parse_json_df.groupBy("ip").agg(
        F.count("ip").alias("pv"),
        F.lit(1).alias("uv"),
        F.first("area").alias("area"),
        F.first("status_code").alias("status_code"),
        F.first("device_os").alias("device_os"),
        F.first("device_brand").alias("device_brand"),
        F.first("browser_name").alias("browser_name"),
        F.min("datetime").alias("first_access_time"),
        F.max("datetime").alias("last_access_time")
    )

    # 4- 数据输出
    # 4.1- 输出到Doris
    def write_2_doris(batch_df:DataFrame, batch_id):
        """
            将DataFrame输出到Doris中
        :param batch_df: 有界的DataFrame
        :param batch_id: 批次ID
        :return:
        """
        # 注意:一般先用append。如果明确知道要怎么做,那可以再使用overwrite
        batch_df.write.jdbc(
            url="jdbc:mysql://192.168.88.166:9030/log_analysis_db?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false",
            table="nginx_log_result",
            mode="append",
            sql={ 'user' : 'root', 'password' : '123456' }
        )


    result_df.writeStream.foreachBatch(write_2_doris).outputMode("update").start()

    # 4.2- 输出到控制台
    result_df.writeStream.format("console").outputMode("update").start().awaitTermination()
  • 结果数据核对
./kafka-console-producer.sh --broker-list up01:9092 --topic dwd_nginx_etl_result
{"ip":"210.27.147.62","cookie":"- - [","datetime":"2024-11-14 11:11:11","t1":"] \"","request":"GET","url":"/search.html","protocol":"HTTP/1.1","code":"401","sendbytes":"58840","refferer":"https://www.douyin.com/goods-recommend/search.html?keyword=美味\"","useragent":"Mozilla/5.0 (Linux; U; Android 9; zh-CN; MI 9 Build/PKQ1.181121.001) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/78.0.3904.108 UCBrowser/13.1.6.1096 Mobile Safari/537.36","proxyaddr":"-","area":"广东省广州市","os":"Android","browser":"UC Browser","device":"XiaoMi MI 9","dt":"2024-11-12"}

1、尝试进行用户行为日志的数据ETL、指标统计

提示:核心是如何解析JSON格式,得到一个个独立的字段

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/971157.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

硬件学习笔记--41 电磁兼容试验-5 射频场感应的传导干扰试验介绍

目录 电磁兼容试验-射频场感应的传导干扰试验介绍 1.试验目的 2.试验方法 3.判定依据及意义 电磁兼容试验-射频场感应的传导干扰试验介绍 驻留时间是在规定频率下影响量施加的持续时间。被试设备&#xff08;EUT&#xff09;在经受扫频频带的电磁影响量或电磁干扰的情况下&a…

告别卡关!XSS挑战之旅全关卡通关思路详解

XSS挑战之旅 XSS测试思路Level1Level2Level3Level4Level5Level6Level7Level8Level9Level10Level11Level12Level13Level14Level15Level16Level17Level18Level19Level20免责声明&#xff1a; XSS测试思路 确定输入输出点&#xff1a; 寻找URL参数、表单输入、HTTP头&#xff08;R…

连锁企业管理系统的五大核心功能

连锁管理系统对于连锁企业的运营和发展至关重要&#xff0c;以下以核货宝连锁管理系统为例&#xff0c;介绍其五大核心功能&#xff1a; 门店管理功能 门店信息管理&#xff1a;核货宝连锁管理系统可集中管理所有门店的详细信息&#xff0c;包括门店地址、联系方式、营业时间、…

【第12章:深度学习与伦理、隐私—12.4 深度学习与伦理、隐私领域的未来挑战与应对策略】

凌晨三点的自动驾驶测试场,AI系统突然在暴雨中做出惊人决策——它选择撞向隔离带而不是紧急变道,因为算法推演发现隔离带后的应急车道站着五个工程师。这个惊悚的伦理困境,揭开了深度学习伦理危机最尖锐的冰山一角。 一、潘多拉魔盒已开:深度学习伦理的四大原罪 1.1 数据原…

深度学习(1)-简单神经网络示例

我们来看一个神经网络的具体实例&#xff1a;使用Python的Keras库来学习手写数字分类。在这个例子中&#xff0c;我们要解决的问题是&#xff0c;将手写数字的灰度图像&#xff08;28像素28像素&#xff09;划分到10个类别中&#xff08;从0到9&#xff09;​。我们将使用MNIST…

腿足机器人之八- 腿足机器人动力学

腿足机器人之八- 腿足机器人动力学 刚体动力学接触动力学与地面交互稳定性判据ZMP(零力矩点)CoM(Center of Mass)捕获点 简化动力学模型双足机器人走路与小跑的动力学对比挑战与前沿技术 腿足机器人的运动学解决“如何到达目标位置”的问题&#xff0c;动力学解决“如何高效稳定…

Kubernetes控制平面组件:etcd高可用集群搭建

云原生学习路线导航页&#xff08;持续更新中&#xff09; kubernetes学习系列快捷链接 Kubernetes架构原则和对象设计&#xff08;一&#xff09;Kubernetes架构原则和对象设计&#xff08;二&#xff09;Kubernetes架构原则和对象设计&#xff08;三&#xff09;Kubernetes控…

HCIA项目实践--静态路由的拓展配置

7.7 静态路由的拓展配置 网络中的两个重要思想&#xff1a; &#xff08;1&#xff09; 实的不行来虚的&#xff1b; &#xff08;2&#xff09; 范围太大&#xff0c;划分范围。&#xff08;分治&#xff09; 7.7.1 负载均衡 &#xff08;1&#xff09;定义 负载均衡是一种网…

node.js + html调用ChatGPTApi实现Ai网站demo(带源码)

文章目录 前言一、demo演示二、node.js 使用步骤1.引入库2.引入包 前端HTML调用接口和UI所有文件总结 前言 关注博主&#xff0c;学习每天一个小demo 今天是Ai对话网站 又到了每天一个小demo的时候咯&#xff0c;前面我写了多人实时对话demo、和视频转换demo&#xff0c;今天…

类和对象(5)——抽象类和接口

目录 1. 抽象类 1.1 抽象类的概念 1.2 抽象类语法&#xff1a;abstract关键字 1.3 抽象类的特性 1.4 抽象类的作用 2. 接口 2.1 接口的概念 2.2 接口语法&#xff1a;interface关键字 2.3 接口的实现&#xff1a;implements关键字 2.4 接口的特性 2.5 实现多个接口 …

kubectl exec 实现的原理

kubectl exec 是 Kubernetes 提供的一个命令&#xff0c;它允许你在指定的 Pod 中执行命令&#xff0c;类似于在容器中打开一个终端会话。这个功能对于调试、监控和管理容器化应用非常有用。kubectl exec 的实现涉及到多个 Kubernetes 组件和机制&#xff0c;包括 API Server、…

【ubuntu24.04】 强制重启导致大模型的磁盘挂载出错

挂载NTFS文件系统出错 各种模型放在了这个机械硬盘上&#xff0c;虽然速度慢&#xff0c;但是好在容量大。大模型在工作&#xff0c;但是程序看起来有问题&#xff0c;导致系统卡死了&#xff0c;然后我重启了&#xff0c;然后报错&#xff1a;wrong fs type bad option &…

【数据结构】 栈和队列

在计算机科学的世界里&#xff0c;数据结构是构建高效算法的基础。栈&#xff08;Stack&#xff09;和队列&#xff08;Queue&#xff09;作为两种基本且重要的数据结构&#xff0c;在软件开发、算法设计等众多领域都有着广泛的应用。今天&#xff0c;我们就来深入探讨一下栈和…

移动端测试的挑战与解决方案:兼容性、网络问题及实战策略

引言 移动应用已成为用户触达服务的核心入口,但移动端测试面临设备多样性、网络波动、用户场景复杂等多重挑战。据Statista统计,2023年全球活跃移动设备超180亿台,操作系统(Android/iOS)版本碎片化率超30%,这对测试工程师提出了极高要求。本文深度解析移动端测试的核心痛…

【设计模式】03-理解常见设计模式-行为型模式(专栏完结)

前言 前面我们介绍完创建型模式和创建型模式&#xff0c;这篇介绍最后的行为型模式&#xff0c;也是【设计模式】专栏的最后一篇。 一、概述 行为型模式主要用于处理对象之间的交互和职责分配&#xff0c;以实现更灵活的行为和更好的协作。 二、常见的行为型模式 1、观察者模…

matlab欠驱动船舶模型预测控制

1、内容简介 matlab135-欠驱动船舶模型预测控制 可以交流、咨询、答疑 2、内容说明 略 针对在风 、 浪 、 流时变干扰下欠驱动水面船舶的轨迹跟踪控制问题 &#xff0c; 设计了一种基于模型 预测控制的轨迹跟踪控制器 &#xff0e; 考虑到欠驱动船舶在没有横向驱动力情况下…

计算机性能与网络体系结构探讨 —— 基于《计算机网络》谢希仁第八版

(꒪ꇴ꒪ )&#xff0c;Hello我是祐言QAQ我的博客主页&#xff1a;C/C语言&#xff0c;数据结构&#xff0c;Linux基础&#xff0c;ARM开发板&#xff0c;网络编程等领域UP&#x1f30d;快上&#x1f698;&#xff0c;一起学习&#xff0c;让我们成为一个强大的攻城狮&#xff0…

Linux上安装jdk1.8和配置环境变量

步骤一&#xff1a;&#xff1a;创建jdk安装目录(该/usr/local/ ,最好把我们自己下载的放到这,容易区分) 可以省略 步骤二&#xff1a;查看安装程序 [rootVM_0_4_centos src]# rpm -qa | grep -i jdk 若之前安装过jdk&#xff0c;下次安装一定把之前的删除干净 下载地址链接…

【Spring+MyBatis】留言墙的实现

目录 1. 添加依赖 2. 配置数据库 2.1 创建数据库与数据表 2.2 创建与数据库对应的实体类 3. 后端代码 3.1 目录结构 3.2 MessageController类 3.3 MessageService类 3.4 MessageMapper接口 4. 前端代码 5. 单元测试 5.1 后端接口测试 5.2 使用前端页面测试 在Spri…

Windows环境安装部署minimind步骤

Windows环境安装部署minimind步骤 必要的软件环境 git git&#xff0c;可下载安装版&#xff0c;本机中下载绿色版&#xff0c;解压到本地目录下&#xff08;如&#xff1a;c:\soft\git.win64&#xff09;&#xff0c;可将此路径添加到PATH环境变量中&#xff0c;供其他程序…