实验五 Spark Structured Streaming编程实践

一、编写程序

(1). 按照tag分组统计生成的日志数。

在新开的终端内输入 vi spark_exercise_testsyslog1.py ,贴入如下代码并运行。运行之前需要关闭“tail终端”内的tail命令并重新运行tail命令,否则多次运行测试可能导致没有新数据生成。

#!/usr/bin/env python3

from functools import partial

from pyspark.sql import SparkSession

from pyspark.sql.functions import *



if __name__ == "__main__":

    spark = SparkSession \

        .builder \

        .appName("Structuredcronlog") \

        .getOrCreate()



    lines = spark \

        .readStream \

        .format("socket") \

        .option("host", "localhost") \

        .option("port", 9988) \

        .load()



    # Nov 24 13:17:01 spark CRON[18455]: (root) CMD (   cd / && run-parts --report /etc/cron.hourly)

    # 定义一个偏应用函数,从固定的pattern获取日志内匹配的字段

    fields = partial(

        regexp_extract, str="value", pattern="^(\w{3}\s*\d{1,2} \d{2}:\d{2}:\d{2}) (.*?) (.*?)\[*\d*\]*: (.*)$"

    )



    words = lines.select(

        unix_timestamp(format_string('2022 %s', fields(idx=1)), 'yy MMM d H:m:s').alias("timestamp"),

        fields(idx=2).alias("hadooplyf316"),

        fields(idx=3).alias("tag"),

        fields(idx=4).alias("content"),

    )



# (1).  按照tag分组统计日志数。

    windowedCounts1 = words \

        .groupBy("tag") \

        .count()

        



    # 开始运行查询并在控制台输出

    query = windowedCounts1 \

        .writeStream \

        .outputMode("append") \

        .format("console") \

        .option('truncate', 'false')\

        .trigger(processingTime="3 seconds") \

        .start()



query.awaitTermination()

(2).输出所有日志内容带spark的日志。

在新开的终端内输入 vi spark_exercise_testsyslog3.py ,贴入如下代码并运行。运行之前需要关闭“tail终端”内的tail命令并重新运行tail命令,否则多次运行测试可能导致没有新数据生成。

#!/usr/bin/env python3



from functools import partial



from pyspark.sql import SparkSession

from pyspark.sql.functions import *





if __name__ == "__main__":

    spark = SparkSession \

        .builder \

        .appName("Structuredcronlog") \

        .getOrCreate()



    lines = spark \

        .readStream \

        .format("socket") \

        .option("host", "localhost") \

        .option("port", 9988) \

        .load()



    # Nov 24 13:17:01 spark CRON[18455]: (root) CMD (   cd / && run-parts --report /etc/cron.hourly)

    # 定义一个偏应用函数,从固定的pattern获取日志内匹配的字段

    fields = partial(

        regexp_extract, str="value", pattern="^(\w{3}\s*\d{1,2} \d{2}:\d{2}:\d{2}) (.*?) (.*?)\[*\d*\]*: (.*)$"

    )



    words = lines.select(

        unix_timestamp(format_string('2022 %s', fields(idx=1)), 'yy MMM d H:m:s').alias("timestamp"),

        fields(idx=2).alias("hostname"),

        fields(idx=3).alias("tag"),

        fields(idx=4).alias("content"),

    )



    # (3).  输出所有日志内容带spark的日志(根据自己模拟的日志内容进行筛选)。

    windowedCounts3 = words \

        .filter("content like '%spark%'")



    # 开始运行查询并在控制台输出

    query = windowedCounts3 \

        .writeStream \

        .outputMode("append") \

        .format("console") \

        .option('truncate', 'false')\

        .trigger(processingTime="3 seconds") \

        .start()



query.awaitTermination()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/605024.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Rust使用HashSet对Vec类型的元素进行去重

在Rust语言中,对Vec类型的元素进行去重,一种常见的方法是使用一个HashSet来帮助我们快速检查元素是否已经存在。以下是使用HashSet对Vec进行去重的示例代码: use std::collections::HashSet;fn main() {let vec_numbers vec![1, 2, 2, 3, 4…

Marin说PCB之国产电源芯片方案 ---STC2620Q

随着小米加入的造车大家庭,让这个本来就卷的要死的造车大家庭更加卷了。随之带来的蝴蝶效应就是江湖上各个造成门派都开始了降本方案的浪潮啊,开始打响价格战了。各家的新能源车企也是不得不开始启动了降本方案的计划了,为了应对降价的浪潮。…

手游广告归因新选择:Xinstall助力精准衡量投放效果

在手游市场竞争日益激烈的今天,广告主们面临着如何精准衡量广告投放效果的难题。手游广告归因平台的出现,为广告主们提供了一种全新的解决方案。而Xinstall,作为其中的佼佼者,正以其独特的优势,助力广告主们破解这一难…

【AI大模型】AI大模型热门关键词解析与核心概念入门

🚀 作者 :“大数据小禅” 🚀 文章简介 :本专栏后续将持续更新大模型相关文章,从开发到微调到应用,需要下载好的模型包可私。 🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬 目…

SPSS多元线性回归

(要满足)模型的假设条件需要对数据进行怎样处理?? 为了使数据满足多元线性回归的条件,通常需要进行以下预处理步骤: 1. 数据清洗:处理缺失值、异常值和重复值,确保数据质量。 2. 特…

python-oracledb 已率先支持 Oracle 23ai

python-oracledb 介绍 python-oracledb (以下简称 oracledb) 是 Python cx_Oracle 驱动程序的新名称,如果你仍在使用 cx_Oracle,建议升级到最新版本的 oracledb。 oracledb 驱动程序是一个开源模块,使 Python 程序能够访问 Oracle 数据库。默…

美业SaaS系统多门店收银系统源码-【卡升组合促销规则】讲解分享

美业管理系统源码 博弈美业SaaS系统 连锁多门店美业收银系统源码 多门店管理 / 会员管理 / 预约管理 / 排班管理 / 商品管理 / 促销活动 PC管理后台、手机APP、iPad APP、微信小程序 1、什么是卡升组合促销? 原价购买的卡项,卡状态正常的情况下&…

分红76.39亿,分红率再创新高,成长活力无限的伊利带来丰厚回报

伊利47万股东,又等来了一个好消息。 4月29日,伊利股份发布2023年报,实现营业总收入1261.79亿元,归母净利润104.29亿元,双创历史新高,实现连续31年稳健增长。 在递交亮眼成绩单的同时,乳业巨头伊…

MyBatis的其他查询操作

前言:在上篇博客介绍了MyBatis的一些增删改查操作,接下来介绍其他查询操作 目录 1 其他查询操作 1.1 多表查询 1.1.1 准备工作 1.1.2 数据查询 1.2 #{}和${} 1.2.1 #{}和${}使用 1.2.2 #{}和${}的区别 1.3 排序功能 1.4 like查询 2 数据库连接池 2.1 …

C++反射之检测struct或class是否实现指定函数

目录 1.引言 2.检测结构体或类的静态函数 3.检测结构体或类的成员函数 3.1.方法1 3.2.方法2 1.引言 诸如Java, C#这些语言是设计的时候就有反射支持的。c没有原生的反射支持。并且,c提供给我们的运行时类型信息非常少,只是通过typeinfo提供了有限的…

【吃透Java手写】1- Spring(上)-启动-扫描-依赖注入-初始化-后置处理器

【吃透Java手写】Spring(上)启动-扫描-依赖注入-初始化-后置处理器 1 准备工作1.1 创建自己的Spring容器类1.2 创建自己的配置类 ComponentScan1.3 ComponentScan1.3.1 Retention1.3.2 Target 1.4 用户类UserService Component1.5 Component1.6 测试类 2…

AI实景自动无人直播软件:引领直播行业智能化革命;提升直播效果,无人直播软件助力智能讲解

随着科技的快速发展,AI实景自动无人直播软件正在引领直播行业迈向智能化革命。它通过智能讲解、一键开播和智能回复等功能,为商家提供了更高效、便捷的直播体验。此外,软件还支持手机拍摄真实场景或搭建虚拟场景,使直播画面更好看…

Unity 性能优化之动态批处理(四)

提示:仅供参考,有误之处,麻烦大佬指出,不胜感激! 文章目录 前言一、动态合批是什么?二、使用动态批处理1.打开动态合批2.满足条件 三、检查动态合批是否成功五、动态合批弊端总结 前言 动态批处理是常用优…

Flutter笔记:手动配置VSCode中Dart代码自动格式化

Flutter笔记 手动配置VSCode中Dart代码自动格式化 - 文章信息 - Author: 李俊才 (jcLee95) Visit me at CSDN: https://jclee95.blog.csdn.netMy WebSite:http://thispage.tech/Email: 291148484163.com. Shenzhen ChinaAddress of this article:https://blog.csd…

pcm转MP3怎么转?只需3个步骤~

PCM(Pulse Code Modulation)是一种用于数字音频编码的基础技术,最早起源于模拟音频信号数字化的需求。通过PCM,模拟音频信号可以被精确地转换为数字形式,为数字音频的发展奠定了基础。 MP3文件格式的多个优点 MP3的优…

【深度学习】网络安全,SQL注入识别,SQL注入检测,基于深度学习的sql注入语句识别,数据集,代码

文章目录 一、 什么是sql注入二、 sql注入的例子三、 深度学习模型3.1. SQL注入识别任务3.2. 使用全连接神经网络来做分类3.3. 使用bert来做sql语句分类 四、 深度学习模型的算法推理和部署五、代码获取 一、 什么是sql注入 SQL注入是一种常见的网络安全漏洞,它允许…

模糊的图片文字,OCR能否正确识别?

拍照手抖、光线不足等复杂的环境下形成的图片都有可能会造成文字模糊,那这些图片文字对于OCR软件来说,是否能否准确识别呢? 这其中的奥秘,与文字的模糊程度紧密相连。想象一下,如果那些文字对于我们的双眼来说&#x…

sed小实践2(随手记)

删除/etc/passwd的第一个字符 #本质是利用sg替换,将第一个字符替换成空 sed s|^.||g /etc/passwd删除/etc/passwd的第二个字符 sed -r s|^(.).(.*$)|\1\2|g /etc/passwd sed -r s|^(.).|\1|g /etc/passwd删除/etc/passwd的最后一个字符 sed s|.$||g /etc/passwd删…

Java快速入门系列-11(项目实战与最佳实践)

第十一章:项目实战与最佳实践 11.1 项目规划与需求分析项目规划需求分析实例代码 11.2 系统设计考虑实例代码 11.3 代码实现与重构实例代码 11.4 性能优化与监控实例代码 11.5 部署与持续集成/持续部署(CI/CD)实例代码 11.1 项目规划与需求分析 在进行任何软件开发…

基于Vumat的修正JC本构模型的切削研究

JC渐进损伤本构是研究切削中的重要本构模型,主要包括材料硬化和损伤两部分:其中,原始JC的硬化部分本构为; 添加图片注释,不超过 140 字(可选) 材料屈服应力的硬化解耦为三部分独立的效应&#x…