结构化编程模型
输出终端/位置
默认情况下,Spark的结构化流支持多种输出方案:
1- console sink: 将结果数据输出到控制台。主要是用在测试中,并且支持3种输出模式
2- File sink: 输出到文件。将结果数据输出到某个目录下,形成文件数据。只支持append模式
3- foreach sink 和 foreachBatch sink: 将数据进行遍历处理。遍历后输出到哪里,取决于自定义函数。并且支持3种输出模式
4- memory sink: 将结果数据输出到内存中。主要目的是进行再次的迭代处理。数据大小不能过大。支持append模式和complete模式
5- Kafka sink: 将结果数据输出到Kafka中。类似于Kafka中的生产者角色。并且支持3种输出模式
File Sink
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions",1)\
.appName('file_sink')\
.master('local[1]')\
.getOrCreate()
# 输出到文件必须要设置checkpointLocation检查点路径
spark.conf.set("spark.sql.streaming.checkpointLocation", "hdfs://node1:8020/chk")
# 2- 数据输入
init_df = spark.readStream\
.format("socket")\
.option("host","192.168.88.161")\
.option("port","55555")\
.load()
# 3- 数据处理
# 4- 数据输出
# 5- 启动流式任务
"""
File Sink总结:
1- 输出到文件必须要设置checkpointLocation检查点路径
2- 因为结构化流底层是微批处理,如果不手动指定处理间隔,程序会尽可能缩短两个批处理间的间隔。那么会导致小文件问题的产生
3- 我们可以使用触发器trigger来指定批处理的时间间隔,用来减少小文件的产生
"""
init_df.writeStream\
.format("csv")\
.outputMode("append")\
.option("sep",",")\
.option("header","True")\
.option("encoding","UTF-8")\
.trigger(processingTime="20 seconds")\
.start("file:///export/data/child")\
.awaitTermination()
file sink总结:
1- 要设置检查点数据存放路径。否则会报如下错误
AnalysisException: checkpointLocation must be specified either through
2- 因为结构化流底层是微批处理,如果不手动指定处理间隔,程序会尽可能缩短两个批处理间的间隔。那么会导致小文件问题的产生
3- 可以通过触发器trigger解决小文件的问题。可以通过触发器来调整每一批次产生间隔的时间
4- 支持输出到本地文件系统和HDFS文件系统
foreach sink
允许对输出的数据进行任意的处理操作,具体如何处理由用户自定义函数决定。对输出的数据一个个进行处理操作。
使用方式主要有二种
方式一:
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions",1)\
.appName('foreach_sink')\
.master('local[*]')\
.getOrCreate()
# 2- 数据输入
init_df = spark.readStream\
.format("socket")\
.option("host","192.168.88.161")\
.option("port","55555")\
.load()
# 3- 数据处理
# 4- 数据输出
# 5- 启动流式任务
"""
每输入一条数据都会调用foreach中的函数。
取Row中某个字段的值:
方式一 row.字段名
方式二 row['字段名']
"""
def my_foreach_func(row):
# 打开数据库连接
# 存储到数据库中
print(row,row.value)
print(row['value'])
# 关闭数据库连接
init_df.writeStream.foreach(my_foreach_func).outputMode("append").start().awaitTermination()
方式二:这种方式的适用场景是需要和资源打交道的情况(例如:连接和关闭数据库、打开关闭文件等)。通过open和close来处理资源,通过process来对数据进行自定义的处理
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions",1)\
.appName('foreach_sink')\
.master('local[*]')\
.getOrCreate()
# 2- 数据输入
init_df = spark.readStream\
.format("socket")\
.option("host","192.168.88.161")\
.option("port","55555")\
.load()
# 3- 数据处理
# 4- 数据输出
# 5- 启动流式任务
# 自定义类
"""
foreach sink方式二
1- 必须要创建一个自定义类,类中必须要有3个方法:open、process、close,方法名称、形参不能改变
2- open中代码执行完成以后,返回值类型是bool类型。如果返回False,那么不会执行process方法;只有返回True的时候才会执行process方法
3- 一般open用来对资源进行初始化;process使用相关资源对数据进行自定义的处理;close用来对资源进行释放
4- 一般会和trigger一起配合使用,用来减少消耗资源的操作
"""
class MyForeachFunc:
def open(self,partition_id, epoch_id):
print(f"partition_id={partition_id},epoch_id={epoch_id}")
# 打开数据库连接
print("打开数据库连接")
return True
def process(self,row):
# 存储到数据库中
print(row, row.value)
def close(self, error):
# 关闭数据库连接
print("关闭数据库连接")
init_df.writeStream\
.foreach(MyForeachFunc())\
.trigger(processingTime="20 seconds")\
.outputMode("append")\
.start()\
.awaitTermination()
说明:
open: 在一个批次中只会调用一次。返回值是bool类型,当返回True的时候,process方法才会被调用
process: 会被调用多次,该批次内有多少行数据,就会被调用多少次
close: 在一个批次中只会调用一次。用来关闭在open打开的资源
1- 必须要创建一个自定义类,类中必须要有3个方法:open、process、close,方法名称、形参不能改变
2- open中代码执行完成以后,返回值类型是bool类型。如果返回False,那么不会执行process方法;只有返回True的时候才会执行process方法
3- 一般open用来对资源进行初始化;process使用相关资源对数据进行自定义的处理;close用来对资源进行释放
4- 一般会和trigger一起配合使用,用来减少消耗资源的操作
foreachBatch Sink
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions",1)\
.appName('foreach_sink')\
.master('local[*]')\
.getOrCreate()
# 2- 数据输入
init_df = spark.readStream\
.format("socket")\
.option("host","192.168.88.161")\
.option("port","55555")\
.load()
# 3- 数据处理
# 4- 数据输出
# 5- 启动流式任务
"""
batch_df:有界的DataFrame,可以调用show()方法
batch_id:批次ID
"""
def my_foreach_batch_func(batch_df, batch_id):
batch_df.show()
print(batch_id)
init_df.writeStream.foreachBatch(my_foreach_batch_func).outputMode("append").start().awaitTermination()
说明: process_fun(batch_df, batch_id)
batch_df: 该批次中的数据形成的有界DataFrame
batch_id: 批次的编号
设置触发器Trigger
触发器Trigger:决定多久执行一次操作并且输出结果。也就是在结构化流中,处理完一批数据以后,等待一会,再处理下一批数据
主要提供如下几种触发器:
- 1- 默认方案:也就是不使用触发器的情况。如果没有明确指定,那么结构化流会自动进行决策每一个批次的大小。在运行过程中,会尽可能让每一个批次间的间隔时间变得更短(常用)
result_df.writeStream.foreachBatch(func)\
.outputMode('append')\
.start()\
.awaitTermination()
- 2- 配置固定的时间间隔:在结构化流运行的过程中,当一批数据处理完以后,下一批数据需要等待一定的时间间隔才会进行处理**(常用,推荐使用)**
result_df.writeStream.foreachBatch(func)\
.outputMode('append')\
.trigger(processingTime='5 seconds')\
.start()\
.awaitTermination()
情形说明:
1- 上一批次的数据在时间间隔内处理完成了,那么会等待我们配置触发器固定的时间间隔结束,才会开始处理下一批数据
2- 上一批次的数据在固定时间间隔结束的时候才处理完成,那么下一批次会立即被处理,不会等待
3- 上一批次的数据在固定时间间隔内没有处理完成,那么下一批次会等待上一批次处理完成以后立即开始处理,不会等待
- 3- 仅此一次:在运行的过程中,程序只需要执行一次,然后就退出。这种方式适用于进行初始化操作,以及关闭资源等
result_df.writeStream.foreachBatch(func)\
.outputMode('append')\
.trigger(once=True)\
.start()\
.awaitTermination()
Checkpoint检查点目录设置
设置检查点,目的是为了提供容错性。当程序出现失败了,可以从检查点的位置,直接恢复处理即可。避免出现重复处理的问题
检查点目录主要包含以下几个目录位置:
1-偏移量offsets: 记录每个批次中的偏移量。为了保证给定的批次始终包含相同的数据。在处理数据之前会将offset信息写入到该目录
2-提交记录commits: 记录已经处理完成的批次。重启任务的时候会检查完成的批次和offsets目录中批次的记录进行对比。确定接下来要处理的批次
3-元数据文件metadata: 和整个查询关联的元数据信息,目前只保留当前的job id
4-数据源sources: 是数据源(Source)各个批次的读取的详情
5-数据接收端sinks: 是数据接收端各个批次的写出的详情
6-状态state: 当有状态操作的时候,例如:累加、聚合、去重等操作场景,这个目录会用来记录这些状态数据。根据配置周期性的生成。snapshot文件用于记录状态
如何设置检查点:
1- SparkSession.conf.set("spark.sql.streaming.checkpointLocation", "检查点路径")
2- option("checkpointLocation", "检查点路径")
推荐: 检查点路径支持本地和HDFS。推荐使用HDFS路径
Spark和Kafka整合
从kafka中读取数据
spark和kafka集成官网文档:
https://spark.apache.org/docs/3.1.2/structured-streaming-kafka-integration.html
流式处理
官方给出方案:
# 订阅Kafka的一个Topic,从最新的消息数据开始消费
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# 订阅Kafka的多个Topic,多个Topic间使用英文逗号进行分隔。从最新的消息数据开始消费
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# 订阅一个Topic,并且指定header信息
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.option("includeHeaders", "true") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
# 订阅符合规则的Topic,从最新的数据开始消费
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribePattern", "topic.*") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
从某一个Topic中读取消息数据:
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions",1)\
.appName('ss_read_kafka_1_topic')\
.master('local[*]')\
.getOrCreate()
# 2- 数据输入
# 默认从最新的地方开始消费
init_df = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
.option("subscribe","test01")\
.load()
# 3- 数据处理
result_df1 = init_df.select(F.expr("cast(value as string) as value"))
# selectExpr = select + F.expr
result_df2 = init_df.selectExpr("cast(value as string) as value")
result_df3 = init_df.withColumn("value",F.expr("cast(value as string)"))
# 4- 数据输出
# 5- 启动流式任务
"""
如果有多个输出,那么只能在最后一个start的后面写awaitTermination()
"""
result_df1.writeStream.queryName("result_df1").format("console").outputMode("append").start()
result_df2.writeStream.queryName("result_df2").format("console").outputMode("append").start()
result_df3.writeStream.queryName("result_df3").format("console").outputMode("append").start().awaitTermination()
按规则订阅Topic:
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions",1)\
.appName('ss_read_kafka_multi_topic')\
.master('local[*]')\
.getOrCreate()
# 2- 数据输入
# 默认从最新的地方开始消费
init_df = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
.option("subscribePattern","test.*")\
.load()
# 3- 数据处理
result_df1 = init_df.select("topic",F.expr("cast(value as string) as value"))
# selectExpr = select + F.expr
result_df2 = init_df.selectExpr("topic","cast(value as string) as value")
result_df3 = init_df.withColumn("value",F.expr("cast(value as string)"))
# 4- 数据输出
# 5- 启动流式任务
"""
如果有多个输出,那么只能在最后一个start的后面写awaitTermination()
"""
result_df1.writeStream.queryName("result_df1").format("console").outputMode("append").start()
result_df2.writeStream.queryName("result_df2").format("console").outputMode("append").start()
result_df3.writeStream.queryName("result_df3").format("console").outputMode("append").start().awaitTermination()
对接kafka后,返回的结果数据内容:
key: 发送数据的key值。如果没有,就为null
value: 最重要的字段。发送数据的value值,也就是消息内容。如果没有,就为null
topic: 表示消息是从哪个Topic中消费出来
partition: 分区编号。表示消费到的该条数据来源于Topic的哪个分区
offset: 表示消息偏移量
timestamp: 接收的时间戳
timestampType: 时间戳类型(无意义)
类型的说明:
列名 | 类型 |
---|---|
key | binary |
value | binary |
topic | string |
partition | int |
offset | long |
timestamp | timestamp |
timestampType | int |
headers (optional) | array |
批处理
官方给出方案:
# 订阅一个Topic主题, 默认从最早到最晚的偏移量范围
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# 批处理订阅Kafka的多个Topic数据。并且可以通过startingOffsets和endingOffsets指定要消费的消息偏移
量(offset)范围。"topic1":{"0":23,"1":-2} 含义是:topic1,"0":23从分区编号为0的分区的
offset=23地方开始消费,"1":-2 从分区编号为1的分区的最开始的地方开始消费
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# 通过正则匹配多个Topic, 默认从最早到最晚的偏移量范围
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribePattern", "topic.*") \
.option("startingOffsets", "earliest") \
.option("endingOffsets", "latest") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
订阅一个Topic:
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions",1)\
.appName('sparksql_read_kafka_1_topic')\
.master('local[*]')\
.getOrCreate()
# 2- 数据输入
# 默认从Topic开头一直消费到结尾
init_df = spark.read\
.format("kafka")\
.option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
.option("subscribe","test02")\
.load()
# 3- 数据处理
result_df1 = init_df.select(F.expr("cast(value as string) as value"))
# selectExpr = select + F.expr
result_df2 = init_df.selectExpr("cast(value as string) as value")
result_df3 = init_df.withColumn("value",F.expr("cast(value as string)"))
# 4- 数据输出
print("result_df1")
result_df1.show()
print("result_df2")
result_df2.show()
print("result_df3")
result_df3.show()
# 5- 释放资源
spark.stop()
指定startingOffsets参数:
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions",1)\
.appName('sparksql_read_kafka_multi_topic')\
.master('local[*]')\
.getOrCreate()
# 2- 数据输入
# 默认从Topic开头一直消费到结尾
# 对每个分区指定具体消费的offset,了解即可。实际工作很少用
# init_df = spark.read\
# .format("kafka")\
# .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
# .option("subscribe","test02,test03")\
# .option("startingOffsets","""{"test02":{"0":1}}""")\
# .load()
init_df = spark.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "node1.itcast.cn:9092,node2.itcast.cn:9092") \
.option("subscribe", "test02,test03") \
.option("startingOffsets", "earliest") \
.load()
# 3- 数据处理
result_df1 = init_df.select(F.expr("cast(value as string) as value"))
# selectExpr = select + F.expr
result_df2 = init_df.selectExpr("cast(value as string) as value")
result_df3 = init_df.withColumn("value",F.expr("cast(value as string)"))
# 4- 数据输出
print("result_df1")
result_df1.show()
print("result_df2")
result_df2.show()
print("result_df3")
result_df3.show(n=100)
# 5- 释放资源
spark.stop()
可能遇到的错误:
原因: 如果有指定startingOffsets或者endingOffsets,需要指定所有分区的offset
-1: latest,最新的地方
-2: earliest,最旧的地方
必备参数:
选项 | 值 | 说明 |
---|---|---|
assign | 通过一个Json 字符串的方式来表示: {“topicA”:[0,1],“topicB”:[2,4]} | 设置使用特定的TopicPartitions |
subscribe | 以逗号分隔的Topic主题列表 | 要订阅的主题列表 |
subscribePattern | 正则表达式字符串 | 订阅匹配符合条件的Topic。assign、subscribe、subscribePattern任意指定一个。 |
kafka.bootstrap.servers | 以英文逗号分隔的host:port列表 | 指定kafka服务的地址 |
数据写入Kafka中
流式处理
官方给出方案:
# 将Key和Value的数据都写入到Kafka当中
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.start()
# 将Key和Value的数据都写入到Kafka当中。使用DataFrame数据中的Topic字段来指定要将数据写入到Kafka集群
的哪个Topic中。这种方式适用于消费多个Topic的情况
ds = df \
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.start()
写出到指定Topic
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions",1)\
.appName('ss_read_kafka_1_topic')\
.master('local[*]')\
.getOrCreate()
# 2- 数据输入
# 默认从最新的地方开始消费
init_df = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
.option("subscribe","test01")\
.load()
# 3- 数据处理
result_df = init_df.select(F.expr("concat(cast(value as string),'_itheima') as value"))
# 4- 数据输出
# 5- 启动流式任务
result_df.writeStream.format("kafka")\
.option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
.option("topic","test02")\
.option("checkpointLocation", "hdfs://node1:8020/chk")\
.start()\
.awaitTermination()
从数据内容中解析得到Topic,然后写入Kafka:
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions",1)\
.appName('ss_read_kafka_multi_topic')\
.master('local[*]')\
.getOrCreate()
# 2- 数据输入
# 默认从最新的地方开始消费
init_df = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
.option("subscribePattern","test.*")\
.load()
# 3- 数据处理
# 错误写法:缺少topic字段
# result_df = init_df.select(F.expr("topic as new_topic"),F.expr("concat(cast(value as string),'_',topic) as value"))
result_df = init_df.select("topic",F.expr("concat(cast(value as string),'_',topic) as value"))
# 4- 数据输出
# 5- 启动流式任务
result_df.writeStream.format("console").outputMode("append").start()
result_df.writeStream.format("kafka")\
.option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
.option("checkpointLocation", "hdfs://node1:8020/chk")\
.start()\
.awaitTermination()
可能遇到的错误一:
原因: 当从数据中解析得到Topic信息的时候,最终输出到Kafka的那个DataFrame中必须要有topic
可能遇到的错误二:
原因: 输出到Kafka中的数据,需要命名value。而且数据类型需要是string或者binary(二进制)
备注 | Column | 数据类型 |
---|---|---|
可选字段 | key | string or binary |
必填字段 | value | string or binary |
可选字段 | headers | string or binary |
必填字段 | topic | string |
可选字段 | partition | int |
批处理
官方给出方案:
# 从DataFrame中写入key-value数据到一个选项中指定的特定Kafka topic中
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.save()
# 使用数据中指定的主题将key-value数据从DataFrame写入Kafka
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.save()
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions",1)\
.appName('ss_read_kafka_1_topic')\
.master('local[*]')\
.getOrCreate()
# 2- 数据输入
# 默认从最新的地方开始消费
init_df = spark.read\
.format("kafka")\
.option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
.option("subscribe","test02")\
.load()
# 3- 数据处理
result_df = init_df.select(F.expr("concat(cast(value as string),'_itheima') as value"))
# 4- 数据输出
# 5- 启动流式任务
result_df.write.format("kafka")\
.option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
.option("topic","test02")\
.option("checkpointLocation", "hdfs://node1:8020/chk")\
.save()
备注 | Column | 数据类型 |
---|---|---|
可选字段 | key | string or binary |
必填字段 | value | string or binary |
可选字段 | headers | array |
必填字段 | topic | string |
可选字段 | partition | int |