Structured_Streaming和Kafka整合

结构化编程模型

输出终端/位置

默认情况下,Spark的结构化流支持多种输出方案:

1- console sink: 将结果数据输出到控制台。主要是用在测试中,并且支持3种输出模式
2- File sink: 输出到文件。将结果数据输出到某个目录下,形成文件数据。只支持append模式
3- foreach sink 和 foreachBatch sink: 将数据进行遍历处理。遍历后输出到哪里,取决于自定义函数。并且支持3种输出模式
4- memory sink: 将结果数据输出到内存中。主要目的是进行再次的迭代处理。数据大小不能过大。支持append模式和complete模式
5- Kafka sink: 将结果数据输出到Kafka中。类似于Kafka中的生产者角色。并且支持3种输出模式

File Sink

import os
from pyspark.sql import SparkSession
import  pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('file_sink')\
        .master('local[1]')\
        .getOrCreate()

    # 输出到文件必须要设置checkpointLocation检查点路径
    spark.conf.set("spark.sql.streaming.checkpointLocation", "hdfs://node1:8020/chk")

    # 2- 数据输入
    init_df = spark.readStream\
        .format("socket")\
        .option("host","192.168.88.161")\
        .option("port","55555")\
        .load()

    # 3- 数据处理
    # 4- 数据输出
    # 5- 启动流式任务
    """
        File Sink总结:
            1- 输出到文件必须要设置checkpointLocation检查点路径
            2- 因为结构化流底层是微批处理,如果不手动指定处理间隔,程序会尽可能缩短两个批处理间的间隔。那么会导致小文件问题的产生
            3- 我们可以使用触发器trigger来指定批处理的时间间隔,用来减少小文件的产生
    """
    init_df.writeStream\
        .format("csv")\
        .outputMode("append")\
        .option("sep",",")\
        .option("header","True")\
        .option("encoding","UTF-8")\
        .trigger(processingTime="20 seconds")\
        .start("file:///export/data/child")\
        .awaitTermination()
file sink总结:
1- 要设置检查点数据存放路径。否则会报如下错误
AnalysisException: checkpointLocation must be specified either through
2- 因为结构化流底层是微批处理,如果不手动指定处理间隔,程序会尽可能缩短两个批处理间的间隔。那么会导致小文件问题的产生
3- 可以通过触发器trigger解决小文件的问题。可以通过触发器来调整每一批次产生间隔的时间
4- 支持输出到本地文件系统和HDFS文件系统

foreach sink
允许对输出的数据进行任意的处理操作,具体如何处理由用户自定义函数决定。对输出的数据一个个进行处理操作。
使用方式主要有二种
在这里插入图片描述
方式一:

import os
from pyspark.sql import SparkSession
import  pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('foreach_sink')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    init_df = spark.readStream\
        .format("socket")\
        .option("host","192.168.88.161")\
        .option("port","55555")\
        .load()

    # 3- 数据处理
    # 4- 数据输出
    # 5- 启动流式任务
    """
        每输入一条数据都会调用foreach中的函数。
        取Row中某个字段的值:   
            方式一 row.字段名
            方式二 row['字段名']
    """
    def my_foreach_func(row):
        # 打开数据库连接

        # 存储到数据库中
        print(row,row.value)
        print(row['value'])

        # 关闭数据库连接

    init_df.writeStream.foreach(my_foreach_func).outputMode("append").start().awaitTermination()

方式二:这种方式的适用场景是需要和资源打交道的情况(例如:连接和关闭数据库、打开关闭文件等)。通过open和close来处理资源,通过process来对数据进行自定义的处理

import os
from pyspark.sql import SparkSession
import  pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('foreach_sink')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    init_df = spark.readStream\
        .format("socket")\
        .option("host","192.168.88.161")\
        .option("port","55555")\
        .load()

    # 3- 数据处理
    # 4- 数据输出
    # 5- 启动流式任务
    # 自定义类
    """
        foreach sink方式二
        1- 必须要创建一个自定义类,类中必须要有3个方法:open、process、close,方法名称、形参不能改变
        2- open中代码执行完成以后,返回值类型是bool类型。如果返回False,那么不会执行process方法;只有返回True的时候才会执行process方法
        3- 一般open用来对资源进行初始化;process使用相关资源对数据进行自定义的处理;close用来对资源进行释放
        4- 一般会和trigger一起配合使用,用来减少消耗资源的操作
    """
    class MyForeachFunc:
        def open(self,partition_id, epoch_id):
            print(f"partition_id={partition_id},epoch_id={epoch_id}")
            # 打开数据库连接
            print("打开数据库连接")
            return True

        def process(self,row):
            # 存储到数据库中
            print(row, row.value)

        def close(self, error):
            # 关闭数据库连接
            print("关闭数据库连接")

    init_df.writeStream\
        .foreach(MyForeachFunc())\
        .trigger(processingTime="20 seconds")\
        .outputMode("append")\
        .start()\
        .awaitTermination()
说明:
	open: 在一个批次中只会调用一次。返回值是bool类型,当返回True的时候,process方法才会被调用
	process: 会被调用多次,该批次内有多少行数据,就会被调用多少次
	close: 在一个批次中只会调用一次。用来关闭在open打开的资源
	
1- 必须要创建一个自定义类,类中必须要有3个方法:open、process、close,方法名称、形参不能改变
2- open中代码执行完成以后,返回值类型是bool类型。如果返回False,那么不会执行process方法;只有返回True的时候才会执行process方法
3- 一般open用来对资源进行初始化;process使用相关资源对数据进行自定义的处理;close用来对资源进行释放
4- 一般会和trigger一起配合使用,用来减少消耗资源的操作

foreachBatch Sink
在这里插入图片描述

import os
from pyspark.sql import SparkSession
import  pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('foreach_sink')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    init_df = spark.readStream\
        .format("socket")\
        .option("host","192.168.88.161")\
        .option("port","55555")\
        .load()

    # 3- 数据处理
    # 4- 数据输出
    # 5- 启动流式任务
    """
        batch_df:有界的DataFrame,可以调用show()方法
        batch_id:批次ID
    """
    def my_foreach_batch_func(batch_df, batch_id):
        batch_df.show()
        print(batch_id)

    init_df.writeStream.foreachBatch(my_foreach_batch_func).outputMode("append").start().awaitTermination()
说明: process_fun(batch_df, batch_id)
batch_df: 该批次中的数据形成的有界DataFrame
batch_id: 批次的编号

设置触发器Trigger

触发器Trigger:决定多久执行一次操作并且输出结果。也就是在结构化流中,处理完一批数据以后,等待一会,再处理下一批数据

主要提供如下几种触发器:

  • 1- 默认方案:也就是不使用触发器的情况。如果没有明确指定,那么结构化流会自动进行决策每一个批次的大小。在运行过程中,会尽可能让每一个批次间的间隔时间变得更短(常用)
result_df.writeStream.foreachBatch(func)\
    .outputMode('append')\
    .start()\
    .awaitTermination()
  • 2- 配置固定的时间间隔:在结构化流运行的过程中,当一批数据处理完以后,下一批数据需要等待一定的时间间隔才会进行处理**(常用,推荐使用)**
result_df.writeStream.foreachBatch(func)\
    .outputMode('append')\
    .trigger(processingTime='5 seconds')\
    .start()\
    .awaitTermination()
    
情形说明:
1- 上一批次的数据在时间间隔内处理完成了,那么会等待我们配置触发器固定的时间间隔结束,才会开始处理下一批数据
2- 上一批次的数据在固定时间间隔结束的时候才处理完成,那么下一批次会立即被处理,不会等待
3- 上一批次的数据在固定时间间隔内没有处理完成,那么下一批次会等待上一批次处理完成以后立即开始处理,不会等待
  • 3- 仅此一次:在运行的过程中,程序只需要执行一次,然后就退出。这种方式适用于进行初始化操作,以及关闭资源等
result_df.writeStream.foreachBatch(func)\
    .outputMode('append')\
    .trigger(once=True)\
    .start()\
    .awaitTermination()

Checkpoint检查点目录设置

设置检查点,目的是为了提供容错性。当程序出现失败了,可以从检查点的位置,直接恢复处理即可。避免出现重复处理的问题

检查点目录主要包含以下几个目录位置:
在这里插入图片描述

1-偏移量offsets: 记录每个批次中的偏移量。为了保证给定的批次始终包含相同的数据。在处理数据之前会将offset信息写入到该目录
2-提交记录commits: 记录已经处理完成的批次。重启任务的时候会检查完成的批次和offsets目录中批次的记录进行对比。确定接下来要处理的批次
3-元数据文件metadata: 和整个查询关联的元数据信息,目前只保留当前的job id
4-数据源sources: 是数据源(Source)各个批次的读取的详情
5-数据接收端sinks: 是数据接收端各个批次的写出的详情
6-状态state: 当有状态操作的时候,例如:累加、聚合、去重等操作场景,这个目录会用来记录这些状态数据。根据配置周期性的生成。snapshot文件用于记录状态

如何设置检查点:

1- SparkSession.conf.set("spark.sql.streaming.checkpointLocation", "检查点路径")
2- option("checkpointLocation", "检查点路径")

推荐: 检查点路径支持本地和HDFS。推荐使用HDFS路径

Spark和Kafka整合

从kafka中读取数据

spark和kafka集成官网文档:
https://spark.apache.org/docs/3.1.2/structured-streaming-kafka-integration.html

流式处理

官方给出方案:

# 订阅Kafka的一个Topic,从最新的消息数据开始消费
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")


# 订阅Kafka的多个Topic,多个Topic间使用英文逗号进行分隔。从最新的消息数据开始消费
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")


# 订阅一个Topic,并且指定header信息
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("includeHeaders", "true") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")


# 订阅符合规则的Topic,从最新的数据开始消费
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

从某一个Topic中读取消息数据:

import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('ss_read_kafka_1_topic')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    # 默认从最新的地方开始消费
    init_df = spark.readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("subscribe","test01")\
        .load()

    # 3- 数据处理
    result_df1 = init_df.select(F.expr("cast(value as string) as value"))

    # selectExpr = select + F.expr
    result_df2 = init_df.selectExpr("cast(value as string) as value")

    result_df3 = init_df.withColumn("value",F.expr("cast(value as string)"))

    # 4- 数据输出
    # 5- 启动流式任务
    """
        如果有多个输出,那么只能在最后一个start的后面写awaitTermination()
    """
    result_df1.writeStream.queryName("result_df1").format("console").outputMode("append").start()
    result_df2.writeStream.queryName("result_df2").format("console").outputMode("append").start()
    result_df3.writeStream.queryName("result_df3").format("console").outputMode("append").start().awaitTermination()

按规则订阅Topic:

import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('ss_read_kafka_multi_topic')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    # 默认从最新的地方开始消费
    init_df = spark.readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("subscribePattern","test.*")\
        .load()

    # 3- 数据处理
    result_df1 = init_df.select("topic",F.expr("cast(value as string) as value"))

    # selectExpr = select + F.expr
    result_df2 = init_df.selectExpr("topic","cast(value as string) as value")

    result_df3 = init_df.withColumn("value",F.expr("cast(value as string)"))

    # 4- 数据输出
    # 5- 启动流式任务
    """
        如果有多个输出,那么只能在最后一个start的后面写awaitTermination()
    """
    result_df1.writeStream.queryName("result_df1").format("console").outputMode("append").start()
    result_df2.writeStream.queryName("result_df2").format("console").outputMode("append").start()
    result_df3.writeStream.queryName("result_df3").format("console").outputMode("append").start().awaitTermination()

对接kafka后,返回的结果数据内容:

key: 发送数据的key值。如果没有,就为null
value: 最重要的字段。发送数据的value值,也就是消息内容。如果没有,就为null
topic: 表示消息是从哪个Topic中消费出来
partition: 分区编号。表示消费到的该条数据来源于Topic的哪个分区
offset: 表示消息偏移量

timestamp: 接收的时间戳
timestampType: 时间戳类型(无意义)

类型的说明:

列名类型
keybinary
valuebinary
topicstring
partitionint
offsetlong
timestamptimestamp
timestampTypeint
headers (optional)array

批处理

官方给出方案:

# 订阅一个Topic主题, 默认从最早到最晚的偏移量范围
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
 
 
# 批处理订阅Kafka的多个Topic数据。并且可以通过startingOffsets和endingOffsets指定要消费的消息偏移
量(offset)范围。"topic1":{"0":23,"1":-2} 含义是:topic1,"0":23从分区编号为0的分区的
offset=23地方开始消费,"1":-2 从分区编号为1的分区的最开始的地方开始消费
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")


# 通过正则匹配多个Topic, 默认从最早到最晚的偏移量范围
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

订阅一个Topic:

import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('sparksql_read_kafka_1_topic')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    # 默认从Topic开头一直消费到结尾
    init_df = spark.read\
        .format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("subscribe","test02")\
        .load()

    # 3- 数据处理
    result_df1 = init_df.select(F.expr("cast(value as string) as value"))

    # selectExpr = select + F.expr
    result_df2 = init_df.selectExpr("cast(value as string) as value")

    result_df3 = init_df.withColumn("value",F.expr("cast(value as string)"))

    # 4- 数据输出
    print("result_df1")
    result_df1.show()

    print("result_df2")
    result_df2.show()

    print("result_df3")
    result_df3.show()

    # 5- 释放资源
    spark.stop()

指定startingOffsets参数:

import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('sparksql_read_kafka_multi_topic')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    # 默认从Topic开头一直消费到结尾
    # 对每个分区指定具体消费的offset,了解即可。实际工作很少用
    # init_df = spark.read\
    #     .format("kafka")\
    #     .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
    #     .option("subscribe","test02,test03")\
    #     .option("startingOffsets","""{"test02":{"0":1}}""")\
    #     .load()

    init_df = spark.read \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "node1.itcast.cn:9092,node2.itcast.cn:9092") \
        .option("subscribe", "test02,test03") \
        .option("startingOffsets", "earliest") \
        .load()

    # 3- 数据处理
    result_df1 = init_df.select(F.expr("cast(value as string) as value"))

    # selectExpr = select + F.expr
    result_df2 = init_df.selectExpr("cast(value as string) as value")

    result_df3 = init_df.withColumn("value",F.expr("cast(value as string)"))

    # 4- 数据输出
    print("result_df1")
    result_df1.show()

    print("result_df2")
    result_df2.show()

    print("result_df3")
    result_df3.show(n=100)

    # 5- 释放资源
    spark.stop()

可能遇到的错误:
在这里插入图片描述

原因: 如果有指定startingOffsets或者endingOffsets,需要指定所有分区的offset

-1: latest,最新的地方
-2: earliest,最旧的地方

必备参数:

选项说明
assign通过一个Json 字符串的方式来表示: {“topicA”:[0,1],“topicB”:[2,4]}设置使用特定的TopicPartitions
subscribe以逗号分隔的Topic主题列表要订阅的主题列表
subscribePattern正则表达式字符串订阅匹配符合条件的Topic。assign、subscribe、subscribePattern任意指定一个。
kafka.bootstrap.servers以英文逗号分隔的host:port列表指定kafka服务的地址

数据写入Kafka中

流式处理

官方给出方案:

# 将Key和Value的数据都写入到Kafka当中
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.start()


# 将Key和Value的数据都写入到Kafka当中。使用DataFrame数据中的Topic字段来指定要将数据写入到Kafka集群
的哪个Topic中。这种方式适用于消费多个Topic的情况
ds = df \
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.start()

写出到指定Topic

import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('ss_read_kafka_1_topic')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    # 默认从最新的地方开始消费
    init_df = spark.readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("subscribe","test01")\
        .load()

    # 3- 数据处理
    result_df = init_df.select(F.expr("concat(cast(value as string),'_itheima') as value"))

    # 4- 数据输出
    # 5- 启动流式任务
    result_df.writeStream.format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("topic","test02")\
        .option("checkpointLocation", "hdfs://node1:8020/chk")\
        .start()\
        .awaitTermination()

从数据内容中解析得到Topic,然后写入Kafka:

import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('ss_read_kafka_multi_topic')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    # 默认从最新的地方开始消费
    init_df = spark.readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("subscribePattern","test.*")\
        .load()

    # 3- 数据处理
    # 错误写法:缺少topic字段
    # result_df = init_df.select(F.expr("topic as new_topic"),F.expr("concat(cast(value as string),'_',topic) as value"))
    result_df = init_df.select("topic",F.expr("concat(cast(value as string),'_',topic) as value"))

    # 4- 数据输出
    # 5- 启动流式任务
    result_df.writeStream.format("console").outputMode("append").start()

    result_df.writeStream.format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("checkpointLocation", "hdfs://node1:8020/chk")\
        .start()\
        .awaitTermination()

可能遇到的错误一:
在这里插入图片描述

原因: 当从数据中解析得到Topic信息的时候,最终输出到Kafka的那个DataFrame中必须要有topic

可能遇到的错误二:
在这里插入图片描述

原因: 输出到Kafka中的数据,需要命名value。而且数据类型需要是string或者binary(二进制)
备注Column数据类型
可选字段keystring or binary
必填字段valuestring or binary
可选字段headersstring or binary
必填字段topicstring
可选字段partitionint

批处理

官方给出方案:

# 从DataFrame中写入key-value数据到一个选项中指定的特定Kafka topic中
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .save()
  

# 使用数据中指定的主题将key-value数据从DataFrame写入Kafka
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .save()
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('ss_read_kafka_1_topic')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    # 默认从最新的地方开始消费
    init_df = spark.read\
        .format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("subscribe","test02")\
        .load()

    # 3- 数据处理
    result_df = init_df.select(F.expr("concat(cast(value as string),'_itheima') as value"))

    # 4- 数据输出
    # 5- 启动流式任务
    result_df.write.format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("topic","test02")\
        .option("checkpointLocation", "hdfs://node1:8020/chk")\
        .save()
备注Column数据类型
可选字段keystring or binary
必填字段valuestring or binary
可选字段headersarray
必填字段topicstring
可选字段partitionint

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/322837.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据绑定,defineProperty,v-on,事件处理

目录​​​​​​​ v-bind单向数据绑定 defineProperty 是v-on的简写 事件处理 v-bind单向数据绑定 从name绑定到v-bind到value单向数据绑定&#xff1a; <input type"text" :value"name"> <input type "text" v-model"na…

【Spring】SpringBoot 统一功能处理

文章目录 前言1. 拦截器1.1 什么是拦截器1.2 拦截器的使用1.2.1 自定义拦截器1.2.2 注册配置拦截器 1.3 拦截器详解1.3.1 拦截路径1.3.2 拦截器执行流程1.3.3 适配器模式 2. 统一数据返回格式3. 统一异常处理 前言 在日常使用 Spring 框架进行开发的时候&#xff0c;对于一些板…

什么是Java泛型?泛型在Java中应用场景

目录 一、什么是Java泛型 二、泛型类 三、泛型接口 四、泛型方法 一、什么是Java泛型 Java泛型是一种在编译时进行类型检查和类型安全的机制。它允许编写能够操作多种类型的代码&#xff0c;而不需要进行类型转换或使用Object类型。通过在定义类、接口或方法时使用泛型参数…

Arduino快速上手esp8266方案开发

认识ESP8266 ESP8266 是 Espressif Systems 生产的 Wi-Fi 片上系统 (SoC)。它非常适合物联网和家庭自动化项目&#xff0c;目前有非常高的市场普及率&#xff0c;还有更加高端的同时支持wifi和蓝牙的双核心芯片ESP32&#xff0c;可以在乐鑫官网查看完整的芯片列表。 ESP8266芯…

克服大模型(LLM)部署障碍,全面理解LLM当前状态

近日&#xff0c;CMU Catalyst 团队推出了一篇关于高效 LLM 推理的综述&#xff0c;覆盖了 300 余篇相关论文&#xff0c;从 MLSys 的研究视角介绍了算法创新和系统优化两个方面的相关进展。 在人工智能&#xff08;AI&#xff09;的快速发展背景下&#xff0c;大语言模型&…

element中el-cascader级联选择器只有最后一级可以多选

文章目录 一、前言二、实现2.1、设置popper-class和multiple2.2、设置样式 三、最后 一、前言 element-ui中el-cascader级联选择器只有最后一级可以多选&#xff0c;其它级只有展开子节点的功能&#xff0c;如下图所示&#xff1a; 可以观察到最后一级的li节点上没有属性aria-…

Python爬虫—requests模块简单应用

Python爬虫—requests模块简介 requests的作用与安装 作用&#xff1a;发送网络请求&#xff0c;返回响应数据 安装&#xff1a;pip install requests requests模块发送简单的get请求、获取响应 需求&#xff1a;通过requests向百度首页发送请求&#xff0c;获取百度首页的…

MATLAB R2023b for Mac 中文

MATLAB R2023b 是 MathWorks 发布的最新版本的 MATLAB&#xff0c;适用于进行算法开发、数据可视化、数据分析以及数值计算等任务的工程师和科学家。它包含了一系列新增功能和改进&#xff0c;如改进了数据导入工具&#xff0c;增加了对数据帧和表格对象的支持&#xff0c;增强…

pr怎么剪掉不要的部分?

在我们的日常工作中&#xff0c;剪视频的时候常常会需要把某个视频中我们不想要的地方剪掉&#xff0c;有的时候是一段&#xff0c;也有可能是好几段。但具体应该怎么样把不要的视频剪掉呢&#xff1f;其实&#xff0c;用pr软件就可以啦&#xff01;那么&#xff0c;pr怎么剪掉…

微信小程序开发WebSocket通讯

官方文档说明&#xff1a;入口 WebSocket连接的链接只支持wss加密方式&#xff0c;且只能用域名的方式 该域名还要在微信公众平台的小程序中登记才能使用&#xff0c;开发->开发管理->服务器域名->修改 该域名要和https使用的一致 以域名地址&#xff1a;dtu.aab…

2024年第02周农产品价格报告

一、摘要 农产品价格监测主要涉及对畜禽类产品、水产品、蔬菜类产品、水果类产品的价格&#xff0c;以周为单位&#xff0c;进行变化情况的数据监测。其中&#xff0c;蔬菜类产品共18种&#xff0c;分别为大白菜、西红柿、黄瓜、青椒、芹菜、土豆、白萝卜、茄子、豆角、胡萝卜…

华媒舍:高效率的新闻资讯新闻媒体宣发套餐内容推广计划方案

怎样让自己的新闻资讯可以被大众孰知&#xff0c;变成了每一个新闻媒体宣发者一同存在的困难。下面我们就给大家介绍一套高效率的新闻资讯新闻媒体宣发套餐内容推广计划方案&#xff0c;致力于帮助新闻媒体宣发者提升宣发高效率&#xff0c;提高新闻资讯的传播性。 1.新闻媒体宣…

如何通过内网穿透实现公网访问Portainer管理监控Docker容器

文章目录 前言1. 部署Portainer2. 本地访问Portainer3. Linux 安装cpolar4. 配置Portainer 公网访问地址5. 公网远程访问Portainer6. 固定Portainer公网地址 正文开始前给大家推荐个网站&#xff0c;前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风…

GoZero微服务个人探究之路(二)Go-Zero官方api demo示例探究

官方文档api-demo教程部分网址如下&#xff1a; api demo 代码生成 | go-zero Documentation 官方demo的架构如下&#xff1a; 编辑 etc包下&#xff1a; demo-api.yaml 编辑服务名称&#xff1a;demo-apiHOST地址&#xff1a;0.0.0.0监听所有可用网络接口Port地址&#…

XSKY助力首个云原生、分布式、全栈国产化银行核心系统投产上线

近日&#xff0c;。该项目是业内首个实际投产的云原生、分布式、全栈国产化的银行核心业务系统&#xff0c;是金融科技领域突破关键核心技术应用的重大实践。 在杭州银行的新一代核心系统中&#xff0c;XSKY星辰天合具备天合翔宇“一池多芯”技术的对象存储支持了核心业务模块…

每日一题 82. 删除排序链表中的重复元素 II(中等,链表)

和昨天差不多&#xff0c;今天的是把所有重复数字的节点都删除&#xff08;昨天留了一个&#xff09; 显然当我们发现重复数字时&#xff0c;需要重复的第一个数字的前一个节点才能把重复数字删完&#xff0c;所有在while循环中我们每次判断 t.next 和 t.next.next 的值是否重复…

AI模型理解误区:微调垂直行业-VS-企业专属知识库或AI助理

概述 企业定制私有化大模型的区别&#xff0c;分为训练大模型和调用大模型两种方向&#xff0c;以及企业自己的智能客服的实现方法。 - 企业定制的私有化大模型与一般的大模型不同&#xff0c;需要高成本训练。- 企业可以选择调用已经训练好的大模型来应用。- 企业可以使用向量…

Zabbix监控(1)

目录 一.什么是zabbix Zabbix 组件&#xff1a; 主动模式&#xff1a; 被动模式&#xff1a; Zabbix 工作原理&#xff1a; zabbix 监控原理&#xff1a; 二.Zabbix 6.0 部署 先安装nginx&#xff0c;php&#xff08;yum源安装&#xff09;&#xff1a; 安装nginx&…

【面试突击】Java内存模型实战

&#x1f308;&#x1f308;&#x1f308;&#x1f308;&#x1f308;&#x1f308;&#x1f308;&#x1f308; 欢迎关注公众号&#xff08;通过文章导读关注&#xff1a;【11来了】&#xff09;&#xff0c;及时收到 AI 前沿项目工具及新技术 的推送 发送 资料 可领取 深入理…

python的包argparse介绍

argparse是一个用来解析命令行参数的 Python 库&#xff0c;它是 Python 标准库的一部分。 1、未使用argparse: import math# 计算圆柱体的体积 def cal_vol(radius,height):vol math.pi * pow(radius,2) * heightreturn vol if __name____main__:print(cal_vol(2,4))2、使用…