目录
有界数据和无界数据
有界数据
无界数据
结构化流
基本介绍
入门案例
结构化流的编程模型
数据结构
数据源(Source)
File Source
Kafka Source(Spark 和 Kafka 整合)
整合Kafka准备工作
从kafka中读取数据
流式处理
批处理
数据写入Kafka中
流式处理
批处理
有界数据和无界数据
有界数据
数据有固定的开始和固定的结束,数据大小是固定的,我们称之为有界数据,对于有界数据,一般采取批处理方案(离线计算)
特点:
1.数据大小是固定的
2.程序处理有界数据,程序最终一定会停止
无界数据
指数据有固定的开始,但是没有固定的结束,我们称之为无界数据,对于无界数据,我们一般采用流式处理方案(实时计算)
特点:
1.数据没有明确的结束,也就是数据大小不固定
2.数据是源源不断的过来
3.程序处理无界数据,程序会一直运行,不会结束
结构化流
基本介绍
结构化流是构建在Spark SQL处理引擎之上的一个流式的处理引擎,主要是针对无界数据的处理操作。对于结构化流同样也支持多种语言操作的API:比如 Python Java Scala SQL ....
Spark的核心是RDD。RDD出现主要的目的就是提供更加高效的离线的迭代计算操作,RDD是针对的有界的数据集,但是为了能够兼容实时计算的处理场景,提供微批处理模型,本质上还是批处理,只不过批与批之间的处理间隔时间变短了,让我们感觉是在进行流式的计算操作,目前默认的微批可以达到100毫秒一次
真正的流处理引擎: Flink、Storm(早期流式处理引擎)、Flume(流式数据采集)
入门案例
需求:完成实时wordcount案例
程序流程:
代码测试操作步骤
首先: 先下载一个 nc(netcat) 命令. 通过此命令打开一个端口号, 并且可以向这个端口写入数据
yum -y install nc
执行nc命令, 开启端口号, 写入数据:
nc -lk 55555注意: 要先启动nc,再启动我们的程序
查看端口号是否被使用命令:
netstat -nlp | grep 要查询的端口注意事项
1.在结构化流中不能调用show()方法
2.需要使用writeStream().start()进行结果数据的输出
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder \
.config("spark.sql.shuffle.partitions", 1) \
.appName('structured_streaming_wordcount') \
.master('local[*]') \
.getOrCreate()
# 2- 数据输入
init_df = spark.readStream \
.format("socket") \
.option("host", "192.168.88.161") \
.option("port", "55555") \
.load()
# 3- 数据处理
result_df = init_df.select(
F.explode(F.split('value', ' ')).alias('word')
).groupBy('word').agg(
F.count('word').alias('cnt')
)
#在结构化流中不能调用show()方法
# init_df.show()
# 4- 数据输出
# 5- 启动流式任务
result_df.writeStream.format('console').outputMode('complete').start().awaitTermination()
结构化流的编程模型
数据结构
在结构化流中,我们可以将DataFrame称为误解的DataFrame或者无界的二维表
数据源(Source)
结构化流默认提供了多种数据源,从而可以支持不同的数据源的处理工作,目前提供了如下数据源:
数据源(Source):
1.Sccket Source:网络套接字数据源,一般用于测试,也就是从网络上消费,读取数据
2.File Source:文件数据源,读取文件系统,一般用于测试,如果文件夹下发生变化,有新文件产生,那么就会触发程序的运行
3.Kafka Source:Kafka数据源,也就是作为消费者来读取Kafka中的数据,一般用于生产环境
4.Rata Source:速率数据源,一般用于测试,通过配置参数,由结构化流自动生成测试数据
对应官网文档内容:
https://spark.apache.org/docs/3.1.2/structured-streaming-programming-guide.html#input-sources
File Source
将目录中写入的文件作为数据流读取,文件的文件格式为:text,csv.json,orc,parquet...
相关参数:
option参数 | 参数说明 |
maxFilesPerTrigger | 每次触发时要考虑的最大新文件数(默认 no max) |
latestFirst | 是否先处理最新的新文件,当有大量文件积压时有用 |
fileNameOnly | 是否检查新文件只有文件名而不是完整路径(默认值:False)将此设置为true时,以下文件将被视为同一个文件,文件名"datadset.txt"相同 “file:///dataset.txt” “s3://a/dataset.txt " "s3n://a/b/dataset.txt" "s3a://a/b/c/dataset.txt" |
读取代码通用格式
sparksession.readStream
.format('CSV|JSON|Text|Parquet|ORC...')
.option('参数名1','参数值1')
.option('参数名2','参数值2')
.option('参数名N','参数值N')
.schema(元数据信息)
.load('需要监听的目录地址')
针对具体数据格式,还有对应的简写API格式,例如:
sparksession.readStream.csv(path='需要监听的目录地址',schema=元数据信息。。。)注意:
File Source总结
1- 只能监听目录,不能监听具体的文件
2- 可以通过*通配符的形式监听目录中满足条件的文件
3- 如果监听目录中有子目录,那么无法监听到子目录的变化情况
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 结构化流数据来源
print('结构化流数据来源_file_source')
# 创建SparkSession顶级对象
spark = SparkSession \
.builder \
.config('spark.sql.shuffle.partitions', 1) \
.appName("structiured_streaming_file_source") \
.master('local[*]') \
.getOrCreate()
# 数据输入
# 复杂API
init_df = spark \
.readStream \
.format('csv') \
.option('path', 'file:///export/data/pyspark_projects/04_Structured_Streaming/data') \
.option('sep', ',') \
.option('encoding', 'utf8') \
.schema("id int,name string")
init_df = spark.readStream.csv(
path='file:///export/data/pyspark_projects/04_Structured_Streaming/data/child',
schema="id int,name string",
sep=',',
encoding='utf8'
)
# 数据处理
# 数据输出
# 启动结构化流
init_df.writeStream.format('console').outputMode('append').start().awaitTermination()
Kafka Source(Spark 和 Kafka 整合)
Spark天然支持集成Kafka, 基于Spark读取Kafka中的数据, 同时可以实施精准一次(仅且只会处理一次)的语义, 作为程序员, 仅需要关心如何处理消息数据即可, 结构化流会将数据读取过来, 转换为一个DataFrame的对象, DataFrame就是一个无界的DataFrame, 是一个无限增大的表
整合Kafka准备工作
如何放置相关的Jar包?
1- 放置位置一: 当spark-submit提交的运行环境为Spark集群环境的时候,以及运行模式为local, 默认从 spark的jars目录下加载相关的jar包,
目录位置: /export/server/spark/jars
2- 放置位置二: 当我们使用pycharm运行代码的时候, 基于python的环境来运行的, 需要在python的环境中可以加载到此jar包
目录位置:
/root/anaconda3/lib/python3.8/site-packages/pyspark/jars/
3- 放置位置三: 当我们提交选择的on yarn模式 需要保证此jar包在HDFS上对应目录下
hdfs的spark的jars目录下: hdfs://node1:8020/spark/jars
请注意: 以上三个位置, 主要是用于放置一些 spark可能会经常使用的jar包, 对于一些不经常使用的jar包, 在后续spark-submit 提交运行的时候, 会有专门的处理方案: spark-submit --jars jar包路径
jar包下载地址:https://mvnrepository.com/
从kafka中读取数据
spark和kafka集成官网文档:
https://spark.apache.org/docs/3.1.2/structured-streaming-kafka-integration.html
流式处理
官方提供的方案:
# 订阅Kafka的一个Topic,从最新的消息数据开始消费
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# 订阅Kafka的多个Topic,多个Topic间使用英文逗号进行分隔。从最新的消息数据开始消费
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# 订阅一个Topic,并且指定header信息
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.option("includeHeaders", "true") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
# 订阅符合规则的Topic,从最新的数据开始消费
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribePattern", "topic.*") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")如果有多个输出,那么只能在最后一个start的后面写awaitTermination()
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 结构化流数据来源
print('结构化流数据来源_file_source')
# 创建SparkSession顶级对象
spark = SparkSession \
.builder \
.config('spark.sql.shuffle.partitions', 1) \
.appName("structiured_streaming_file_source") \
.master('local[*]') \
.getOrCreate()
# 数据输入
#订阅一个topic,并且指定header信息
init_df = spark \
.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'node1:9092,node2:9092') \
.option('subscribe', 'search-log-topic') \
.option('includeHeaders', 'true') \
.load()
elt_df = init_df.selectExpr("cast(value as string)")
# 启动流式
elt_df.writeStream.format('console').outputMode('append').start().awaitTermination()
# 订阅符合规则的topic,并且指定header信息
init_df = spark \
.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'node1:9092,node2:9092') \
.option('subscribePattern', 'topic.*') \
.option('includeHeaders', 'true') \
.load()
elt_df = init_df.selectExpr("cast(value as string)")
# 启动流式
elt_df.writeStream.format('console').outputMode('append').start().awaitTermination()
#订阅多个topic,从最新的消息开始消费
init_df = spark \
.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'node1:9092,node2:9092') \
.option('subscribe', 'test01,test02') \
.option('includeHeaders', 'true') \
.load()
elt_df = init_df.selectExpr("cast(value as string)")
# 启动流式
elt_df.writeStream.format('console').outputMode('append').start().awaitTermination()
init_df.writeStream.format('console').outputMode('append').start().awaitTermination()
对接kafka后,返回的结果数据内容:
key:发送数据的key值,如果没有,就为null
value:最重要的字段,发送数据的value值,也就是消息内容,如果没有就为null
topic:表示消息从哪个Topic中消费出来
partition:分区编号,表示消费到的该条数据来源于Topic的哪个分区
offset:消息偏移量
timestamp:接收的时间戳
timestampType:时间戳类型
批处理
官方提供的方案:
# 订阅一个Topic主题, 默认从最早到最晚的偏移量范围
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# 批处理订阅Kafka的多个Topic数据。并且可以通过startingOffsets和endingOffsets指定要消费的消息偏移
量(offset)范围。"topic1":{"0":23,"1":-2} 含义是:topic1,"0":23从分区编号为0的分区的
offset=23地方开始消费,"1":-2 从分区编号为1的分区的最开始的地方开始消费
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# 通过正则匹配多个Topic, 默认从最早到最晚的偏移量范围
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribePattern", "topic.*") \
.option("startingOffsets", "earliest") \
.option("endingOffsets", "latest") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")注意:
如果有指定startingOffsets或者endingOffsets,需要指定所有分区的offset
-1: latest,最新的地方
-2: earliest,最旧的地方
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 结构化流数据来源
print('结构化流数据来源_file_source')
# 创建SparkSession顶级对象
spark = SparkSession \
.builder \
.config('spark.sql.shuffle.partitions', 1) \
.appName("structiured_streaming_file_source") \
.master('local[*]') \
.getOrCreate()
# 数据输入
# 订阅一个topic,并且指定header信息
init_df = spark \
.read \
.format('kafka') \
.option('kafka.bootstrap.servers', 'node1:9092,node2:9092') \
.option('subscribe', 'search-log-topic') \
.option('includeHeaders', 'true') \
.load()
elt_df = init_df.selectExpr("cast(value as string)")
# 启动流式
elt_df.show()
print('=' * 50)
# 订阅符合规则的topic,并且指定header信息
init_df = spark \
.read \
.format('kafka') \
.option('kafka.bootstrap.servers', 'node1:9092,node2:9092') \
.option('subscribePattern', 'topic.*') \
.option('includeHeaders', 'true') \
.load()
elt_df = init_df.selectExpr("cast(value as string)")
# 启动流式
elt_df.show()
print('=' * 50)
# 订阅多个topic,从最新的消息开始消费
init_df = spark \
.read \
.format('kafka') \
.option('kafka.bootstrap.servers', 'node1:9092,node2:9092') \
.option('subscribe', 'test01,test02') \
.option('includeHeaders', 'true') \
.load()
elt_df = init_df.selectExpr("cast(value as string)")
# 启动流式
elt_df.show()
# 释放资源
spark.stop()
数据写入Kafka中
官方方案:
# 将Key和Value的数据都写入到Kafka当中
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.start()
# 将Key和Value的数据都写入到Kafka当中。使用DataFrame数据中的Topic字段来指定要将数据写入到Kafka集群
的哪个Topic中。这种方式适用于消费多个Topic的情况
ds = df \
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.start()
流式处理
# 数据写入Kafka中
# 写出到指定Topic
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
# 创建sparksession对象
spark = SparkSession \
.builder \
.appName('spark_read_kafka_demo') \
.master('local[*]') \
.getOrCreate()
# 数据输入
# 默认从最新的地方消费
init_df = spark.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'node1.itcast.cn:9092,node2.itcast.cn:9092') \
.option('topic', 'test01') \
.option('subscribe', 'test01') \
.load()
# 数据处理
result_df = init_df.select(F.expr("concat(cast(value as string),'_itheima') as value"))
# 启动流式任务
result_df.writeStream.format('kafka') \
.option('kafka.bootstrap.servers', 'node1.itcast.cn:9092,node2.itcast.cn:9092') \
.option('topic', 'test01') \
.option("checkpointLocation", "hdfs://node1:8020/day10/chk") \
.start() \
.awaitTermination()
从数据内容中解析得到Topic,然后写入Kafka
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions",1)\
.appName('ss_read_kafka_multi_topic')\
.master('local[*]')\
.getOrCreate()
# 2- 数据输入
# 默认从最新的地方开始消费
init_df = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
.option("subscribePattern","test.*")\
.load()
# 3- 数据处理
# 错误写法:缺少topic字段
# result_df = init_df.select(F.expr("topic as new_topic"),F.expr("concat(cast(value as string),'_',topic) as value"))
result_df = init_df.select("topic",F.expr("concat(cast(value as string),'_',topic) as value"))
# 4- 数据输出
# 5- 启动流式任务
result_df.writeStream.format("console").outputMode("append").start()
result_df.writeStream.format("kafka")\
.option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
.option("checkpointLocation", "hdfs://node1:8020/day10/chk")\
.start()\
.awaitTermination()
批处理
官方给出方案:
# 从DataFrame中写入key-value数据到一个选项中指定的特定Kafka topic中
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.save()
# 使用数据中指定的主题将key-value数据从DataFrame写入Kafka
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.save()
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions",1)\
.appName('ss_read_kafka_1_topic')\
.master('local[*]')\
.getOrCreate()
# 2- 数据输入
# 默认从最新的地方开始消费
init_df = spark.read\
.format("kafka")\
.option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
.option("subscribe","test02")\
.load()
# 3- 数据处理
result_df = init_df.select(F.expr("concat(cast(value as string),'_itheima') as value"))
# 4- 数据输出
# 5- 启动流式任务
result_df.write.format("kafka")\
.option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
.option("topic","test02")\
.option("checkpointLocation", "hdfs://node1:8020/day10/chk")\
.save()