结构化流的介绍

目录

有界数据和无界数据

有界数据

 无界数据

 结构化流

基本介绍

入门案例

结构化流的编程模型

数据结构

数据源(Source)

File Source

Kafka Source(Spark 和 Kafka 整合)

整合Kafka准备工作

从kafka中读取数据

流式处理

批处理

 数据写入Kafka中

流式处理

批处理


有界数据和无界数据

有界数据

数据有固定的开始和固定的结束,数据大小是固定的,我们称之为有界数据,对于有界数据,一般采取批处理方案(离线计算)

特点:

1.数据大小是固定的

2.程序处理有界数据,程序最终一定会停止

 无界数据

指数据有固定的开始,但是没有固定的结束,我们称之为无界数据,对于无界数据,我们一般采用流式处理方案(实时计算)

特点:

        1.数据没有明确的结束,也就是数据大小不固定

        2.数据是源源不断的过来

        3.程序处理无界数据,程序会一直运行,不会结束

 结构化流

基本介绍

  结构化流是构建在Spark SQL处理引擎之上的一个流式的处理引擎,主要是针对无界数据的处理操作。对于结构化流同样也支持多种语言操作的API:比如 Python Java Scala SQL ....

Spark的核心是RDD。RDD出现主要的目的就是提供更加高效的离线的迭代计算操作,RDD是针对的有界的数据集,但是为了能够兼容实时计算的处理场景,提供微批处理模型,本质上还是批处理,只不过批与批之间的处理间隔时间变短了,让我们感觉是在进行流式的计算操作,目前默认的微批可以达到100毫秒一次

真正的流处理引擎: Flink、Storm(早期流式处理引擎)、Flume(流式数据采集)

入门案例

需求:完成实时wordcount案例

程序流程:

代码测试操作步骤

首先: 先下载一个 nc(netcat) 命令. 通过此命令打开一个端口号, 并且可以向这个端口写入数据
yum -y install nc
    
执行nc命令, 开启端口号, 写入数据:
nc -lk 55555

注意: 要先启动nc,再启动我们的程序


查看端口号是否被使用命令:
netstat -nlp | grep 要查询的端口

注意事项

1.在结构化流中不能调用show()方法

2.需要使用writeStream().start()进行结果数据的输出

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F


# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder \
        .config("spark.sql.shuffle.partitions", 1) \
        .appName('structured_streaming_wordcount') \
        .master('local[*]') \
        .getOrCreate()

    # 2- 数据输入
    init_df = spark.readStream \
        .format("socket") \
        .option("host", "192.168.88.161") \
        .option("port", "55555") \
        .load()

    # 3- 数据处理
    result_df = init_df.select(
        F.explode(F.split('value', ' ')).alias('word')
    ).groupBy('word').agg(
        F.count('word').alias('cnt')
    )

    #在结构化流中不能调用show()方法
    # init_df.show()

    # 4- 数据输出
    # 5- 启动流式任务
    result_df.writeStream.format('console').outputMode('complete').start().awaitTermination()

结构化流的编程模型

数据结构

    在结构化流中,我们可以将DataFrame称为误解的DataFrame或者无界的二维表

数据源(Source)

    结构化流默认提供了多种数据源,从而可以支持不同的数据源的处理工作,目前提供了如下数据源:

数据源(Source):

1.Sccket Source:网络套接字数据源,一般用于测试,也就是从网络上消费,读取数据

2.File Source:文件数据源,读取文件系统,一般用于测试,如果文件夹下发生变化,有新文件产生,那么就会触发程序的运行

3.Kafka Source:Kafka数据源,也就是作为消费者来读取Kafka中的数据,一般用于生产环境

4.Rata Source:速率数据源,一般用于测试,通过配置参数,由结构化流自动生成测试数据

对应官网文档内容:

https://spark.apache.org/docs/3.1.2/structured-streaming-programming-guide.html#input-sources

File Source

        将目录中写入的文件作为数据流读取,文件的文件格式为:text,csv.json,orc,parquet...

相关参数:

option参数参数说明
maxFilesPerTrigger每次触发时要考虑的最大新文件数(默认 no max)
latestFirst是否先处理最新的新文件,当有大量文件积压时有用
fileNameOnly

是否检查新文件只有文件名而不是完整路径(默认值:False)将此设置为true时,以下文件将被视为同一个文件,文件名"datadset.txt"相同

“file:///dataset.txt” “s3://a/dataset.txt "                              "s3n://a/b/dataset.txt"                                                    "s3a://a/b/c/dataset.txt"

读取代码通用格式

sparksession.readStream
    .format('CSV|JSON|Text|Parquet|ORC...')
    .option('参数名1','参数值1')
    .option('参数名2','参数值2')
    .option('参数名N','参数值N')
    .schema(元数据信息)
    .load('需要监听的目录地址')
    
    
针对具体数据格式,还有对应的简写API格式,例如:
    sparksession.readStream.csv(path='需要监听的目录地址',schema=元数据信息。。。)

注意:
        File Source总结
            1- 只能监听目录,不能监听具体的文件
            2- 可以通过*通配符的形式监听目录中满足条件的文件
            3- 如果监听目录中有子目录,那么无法监听到子目录的变化情况

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
    # 结构化流数据来源
    print('结构化流数据来源_file_source')
    # 创建SparkSession顶级对象
    spark = SparkSession \
        .builder \
        .config('spark.sql.shuffle.partitions', 1) \
        .appName("structiured_streaming_file_source") \
        .master('local[*]') \
        .getOrCreate()
    # 数据输入
    # 复杂API
    init_df = spark \
        .readStream \
        .format('csv') \
        .option('path', 'file:///export/data/pyspark_projects/04_Structured_Streaming/data') \
        .option('sep', ',') \
        .option('encoding', 'utf8') \
        .schema("id int,name string") 


    init_df = spark.readStream.csv(
        path='file:///export/data/pyspark_projects/04_Structured_Streaming/data/child',
        schema="id int,name string",
        sep=',',
        encoding='utf8'
    )
    # 数据处理
    # 数据输出
    # 启动结构化流
    init_df.writeStream.format('console').outputMode('append').start().awaitTermination()

Kafka Source(Spark 和 Kafka 整合)

         Spark天然支持集成Kafka, 基于Spark读取Kafka中的数据, 同时可以实施精准一次(仅且只会处理一次)的语义, 作为程序员, 仅需要关心如何处理消息数据即可, 结构化流会将数据读取过来, 转换为一个DataFrame的对象, DataFrame就是一个无界的DataFrame, 是一个无限增大的表

整合Kafka准备工作

如何放置相关的Jar包?  
    1- 放置位置一: 当spark-submit提交的运行环境为Spark集群环境的时候,以及运行模式为local, 默认从 spark的jars目录下加载相关的jar包,
        目录位置: /export/server/spark/jars
    
    2- 放置位置二: 当我们使用pycharm运行代码的时候, 基于python的环境来运行的, 需要在python的环境中可以加载到此jar包
        目录位置:
            /root/anaconda3/lib/python3.8/site-packages/pyspark/jars/
    
    3- 放置位置三: 当我们提交选择的on yarn模式 需要保证此jar包在HDFS上对应目录下
        hdfs的spark的jars目录下:  hdfs://node1:8020/spark/jars
        

    请注意: 以上三个位置, 主要是用于放置一些 spark可能会经常使用的jar包, 对于一些不经常使用的jar包, 在后续spark-submit 提交运行的时候, 会有专门的处理方案:  spark-submit --jars jar包路径
    
    jar包下载地址:https://mvnrepository.com/

从kafka中读取数据

spark和kafka集成官网文档:

https://spark.apache.org/docs/3.1.2/structured-streaming-kafka-integration.html

流式处理

官方提供的方案:

# 订阅Kafka的一个Topic,从最新的消息数据开始消费
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")


# 订阅Kafka的多个Topic,多个Topic间使用英文逗号进行分隔。从最新的消息数据开始消费
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1,topic2") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")


# 订阅一个Topic,并且指定header信息
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("includeHeaders", "true") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")


# 订阅符合规则的Topic,从最新的数据开始消费
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

如果有多个输出,那么只能在最后一个start的后面写awaitTermination()

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
    # 结构化流数据来源
    print('结构化流数据来源_file_source')
    # 创建SparkSession顶级对象
    spark = SparkSession \
        .builder \
        .config('spark.sql.shuffle.partitions', 1) \
        .appName("structiured_streaming_file_source") \
        .master('local[*]') \
        .getOrCreate()
    # 数据输入
    #订阅一个topic,并且指定header信息
    init_df = spark \
        .readStream \
        .format('kafka') \
        .option('kafka.bootstrap.servers', 'node1:9092,node2:9092') \
        .option('subscribe', 'search-log-topic') \
        .option('includeHeaders', 'true') \
        .load()
    elt_df = init_df.selectExpr("cast(value as string)")
    # 启动流式
    elt_df.writeStream.format('console').outputMode('append').start().awaitTermination()

    # 订阅符合规则的topic,并且指定header信息
    init_df = spark \
        .readStream \
        .format('kafka') \
        .option('kafka.bootstrap.servers', 'node1:9092,node2:9092') \
        .option('subscribePattern', 'topic.*') \
        .option('includeHeaders', 'true') \
        .load()
    elt_df = init_df.selectExpr("cast(value as string)")
    # 启动流式
    elt_df.writeStream.format('console').outputMode('append').start().awaitTermination()


    #订阅多个topic,从最新的消息开始消费
    init_df = spark \
        .readStream \
        .format('kafka') \
        .option('kafka.bootstrap.servers', 'node1:9092,node2:9092') \
        .option('subscribe', 'test01,test02') \
        .option('includeHeaders', 'true') \
        .load()
    elt_df = init_df.selectExpr("cast(value as string)")
    # 启动流式
    elt_df.writeStream.format('console').outputMode('append').start().awaitTermination()
    init_df.writeStream.format('console').outputMode('append').start().awaitTermination()

对接kafka后,返回的结果数据内容:

key:发送数据的key值,如果没有,就为null

value:最重要的字段,发送数据的value值,也就是消息内容,如果没有就为null

topic:表示消息从哪个Topic中消费出来

partition:分区编号,表示消费到的该条数据来源于Topic的哪个分区

offset:消息偏移量

timestamp:接收的时间戳

timestampType:时间戳类型

批处理

官方提供的方案:

# 订阅一个Topic主题, 默认从最早到最晚的偏移量范围
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
 
 
# 批处理订阅Kafka的多个Topic数据。并且可以通过startingOffsets和endingOffsets指定要消费的消息偏移
量(offset)范围。"topic1":{"0":23,"1":-2} 含义是:topic1,"0":23从分区编号为0的分区的
offset=23地方开始消费,"1":-2 从分区编号为1的分区的最开始的地方开始消费
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")


# 通过正则匹配多个Topic, 默认从最早到最晚的偏移量范围
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribePattern", "topic.*") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

注意:

 如果有指定startingOffsets或者endingOffsets,需要指定所有分区的offset

-1: latest,最新的地方
-2: earliest,最旧的地方

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
    # 结构化流数据来源
    print('结构化流数据来源_file_source')
    # 创建SparkSession顶级对象
    spark = SparkSession \
        .builder \
        .config('spark.sql.shuffle.partitions', 1) \
        .appName("structiured_streaming_file_source") \
        .master('local[*]') \
        .getOrCreate()
    # 数据输入
    # 订阅一个topic,并且指定header信息
    init_df = spark \
        .read \
        .format('kafka') \
        .option('kafka.bootstrap.servers', 'node1:9092,node2:9092') \
        .option('subscribe', 'search-log-topic') \
        .option('includeHeaders', 'true') \
        .load()
    elt_df = init_df.selectExpr("cast(value as string)")
    # 启动流式
    elt_df.show()
    print('=' * 50)

    # 订阅符合规则的topic,并且指定header信息
    init_df = spark \
        .read \
        .format('kafka') \
        .option('kafka.bootstrap.servers', 'node1:9092,node2:9092') \
        .option('subscribePattern', 'topic.*') \
        .option('includeHeaders', 'true') \
        .load()
    elt_df = init_df.selectExpr("cast(value as string)")
    # 启动流式
    elt_df.show()

    print('=' * 50)
    # 订阅多个topic,从最新的消息开始消费
    init_df = spark \
        .read \
        .format('kafka') \
        .option('kafka.bootstrap.servers', 'node1:9092,node2:9092') \
        .option('subscribe', 'test01,test02') \
        .option('includeHeaders', 'true') \
        .load()
    elt_df = init_df.selectExpr("cast(value as string)")
    # 启动流式
    elt_df.show()

    # 释放资源
    spark.stop()

 数据写入Kafka中

官方方案:

# 将Key和Value的数据都写入到Kafka当中
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.start()


# 将Key和Value的数据都写入到Kafka当中。使用DataFrame数据中的Topic字段来指定要将数据写入到Kafka集群
的哪个Topic中。这种方式适用于消费多个Topic的情况
ds = df \
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.start()

流式处理
# 数据写入Kafka中
# 写出到指定Topic
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的python解释器


os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
    # 创建sparksession对象
    spark = SparkSession \
        .builder \
        .appName('spark_read_kafka_demo') \
        .master('local[*]') \
        .getOrCreate()
    # 数据输入
    # 默认从最新的地方消费
    init_df = spark.readStream \
        .format('kafka') \
        .option('kafka.bootstrap.servers', 'node1.itcast.cn:9092,node2.itcast.cn:9092') \
        .option('topic', 'test01') \
        .option('subscribe', 'test01') \
        .load()
    # 数据处理
    result_df = init_df.select(F.expr("concat(cast(value as string),'_itheima') as value"))

    # 启动流式任务
    result_df.writeStream.format('kafka') \
        .option('kafka.bootstrap.servers', 'node1.itcast.cn:9092,node2.itcast.cn:9092') \
        .option('topic', 'test01') \
        .option("checkpointLocation", "hdfs://node1:8020/day10/chk") \
        .start() \
        .awaitTermination()

从数据内容中解析得到Topic,然后写入Kafka

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('ss_read_kafka_multi_topic')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    # 默认从最新的地方开始消费
    init_df = spark.readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("subscribePattern","test.*")\
        .load()

    # 3- 数据处理
    # 错误写法:缺少topic字段
    # result_df = init_df.select(F.expr("topic as new_topic"),F.expr("concat(cast(value as string),'_',topic) as value"))
    result_df = init_df.select("topic",F.expr("concat(cast(value as string),'_',topic) as value"))

    # 4- 数据输出
    # 5- 启动流式任务
    result_df.writeStream.format("console").outputMode("append").start()

    result_df.writeStream.format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("checkpointLocation", "hdfs://node1:8020/day10/chk")\
        .start()\
        .awaitTermination()
批处理

官方给出方案:

# 从DataFrame中写入key-value数据到一个选项中指定的特定Kafka topic中
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("topic", "topic1") \
  .save()
 

# 使用数据中指定的主题将key-value数据从DataFrame写入Kafka
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
  .write \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .save()

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .config("spark.sql.shuffle.partitions",1)\
        .appName('ss_read_kafka_1_topic')\
        .master('local[*]')\
        .getOrCreate()

    # 2- 数据输入
    # 默认从最新的地方开始消费
    init_df = spark.read\
        .format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("subscribe","test02")\
        .load()

    # 3- 数据处理
    result_df = init_df.select(F.expr("concat(cast(value as string),'_itheima') as value"))

    # 4- 数据输出
    # 5- 启动流式任务
    result_df.write.format("kafka")\
        .option("kafka.bootstrap.servers","node1.itcast.cn:9092,node2.itcast.cn:9092")\
        .option("topic","test02")\
        .option("checkpointLocation", "hdfs://node1:8020/day10/chk")\
        .save()

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/326691.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Hadoop——HDFS、MapReduce、Yarn期末复习版(搭配尚硅谷视频速通)

一、HDFS 1.HDFS概述 1.1 HDFS定义 HDFS(Hadoop Distributed File System),它是一个文件系统,用于存储文件,通过目录树来定位文件;其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务器有各自…

conda环境下FutureWarning: Pass sr=16000, n_fft=800 as keyword args问题解决

1 问题描述 在训练语音处理模型过程中,出现如下错误: audio.py:100: FutureWarning: Pass sr16000, n_fft800 as keyword args. From version 0.10 passing these as positional arguments will result in an errorreturn librosa.filters.mel(hp.samp…

Docker之网络配置的使用

🎉🎉欢迎来到我的CSDN主页!🎉🎉 🏅我是君易--鑨,一个在CSDN分享笔记的博主。📚📚 🌟推荐给大家我的博客专栏《Docker之网络配置的使用》。🎯&…

芯品荟|电梯外呼面板屏驱市场调研报告

PART ONE 产品简介 - Introduction - 1.电梯外呼面板介绍 电梯外呼面板,用于显示电梯当前位置、运行状态和楼层信息,以便乘客在等待电梯时了解电梯的运行情况。 电梯外呼面板,按显示屏的种类,分为3类,分别是LED屏、L…

游戏开发,中小公司跳槽去大厂容易还是考研应届生校招容易?

游戏开发,中小公司跳槽去大厂容易还是考研应届生校招容易? 在之前的文章中,我们提到过,游戏开发行业首选直接进入游戏大厂。《开发者必读:如何选择适合的游戏开发公司?》因为大厂不仅能提供良好的职业发展…

记录汇川:H5U与Factory IO测试14

现实53工位的物料运输。 设置了自动连续存启动:就是一个一个运,按照顺序将空的货架填满。 设置了自动连续存停止:就是完成当前循环后退出。 设置了自动连续取启动:就是一个一个运,按照顺序将有货的货架清空。 设置…

js:使用canvas画一个半圆

背景 需求需要画一个半圆&#xff0c;或者多半圆&#xff0c;其实一下子就能想到 canvas 中的圆弧&#xff0c;核心使用 context.arc context.arc(x,y,r,sAngle,eAngle,counterclockwise)接下来我们看看示例 例一 <!DOCTYPE html> <html lang"en"> &…

SpringBoot集成p6spy

P6Spy 是一个可以用来在应用程序中拦截和修改数据操作语句的开源框架。 通过 P6Spy 我们可以对 SQL 语句进行拦截,相当于一个 SQL 语句的记录器,这样我们可以用它来作相关的分析,比如性能分析。这里主要用于在控制台打印SQL时能自动将问号替换成实际参数打印一个可执行的SQL…

将 pyparamvalidate 项目,发布至 pypi

目录 一、前置说明1、总体目录2、相关回顾3、本节目标 二、操作步骤1、项目目录2、编写 pyproject.toml 文件3、编写 LICENSE 文件4、编写 README.md 文件5、升级 pip、build、twine 工具6、打包发布的版本7、测试发布至 TestPyPI8、创建测试项目&#xff0c;测试发布结果9、正…

成功解决VScode进入到内置函数中调试

主要有两个关键步骤&#xff0c; 第一步 将launch.json中的"justMyCode"设为false 可通过使用ctrlshiftP搜索lauch.json找到次文件 如果找不到的话&#xff0c;可点击debug按钮&#xff0c;然后找到点击create a launch.json file创建 创建得到的launch.json如下&am…

直接win+r打开命令控制台安装element-ui 与 在项目目录下安装element-ui的区别是什么?

使用Windows运行命令&#xff08;WinR&#xff09;打开命令控制台&#xff08;通常指的是cmd或PowerShell&#xff09;并安装element-ui与在项目目录下打开命令控制台进行安装的主要区别在于当前工作目录的不同。 直接WinR打开命令控制台安装element-ui&#xff1a;这种方式下…

On the User Behavior Leakage from Recommender System Exposure

ACM TOIS 2023 代码链接 系统暴露&#xff1a;用户的当前推荐项目列表。 对推荐系统进行攻击也就是说根据用户当前推荐项目列表输出用户的历史行为。 论文试图解决什么问题&#xff1f; 本文试图解决的问题是&#xff1a;在推荐系统中&#xff0c;用户历史行为隐私是否可以从…

mysql配置(各种配置参数详解)

mysql配置文件 在MySQL中&#xff0c;常用的配置包括&#xff1a; 数据库服务器配置 bind-address&#xff1a;指定MySQL服务器绑定的IP地址&#xff0c;默认为0.0.0.0&#xff08;所有可用IP&#xff09;。port&#xff1a;指定MySQL服务器监听的端口号&#xff0c;默认为330…

vscode显示120字符或者80字符提示线或者显示垂直标尺

vscode显示120字符或者80字符提示线或者显示垂直标尺 一般规定一行代码不超过80或者120个字符。取决于团队的编码规范。 不同公司不同团队有不同的规定。 当单行代码过长。产生横向滚动条。使得代码难以阅读。 打开全局设置的settings.json /C:/Users/xxx/AppData/Roaming/Cod…

Simulink旧版本如何打开新版的模型文件

Simulink旧版本如何打开新版的模型文件 当用旧版本Simulink软件打开模型时会报错&#xff0c;是因为版本不兼容造成的 解决办法 在simulink的选项中去掉 do not load models created with newer version of Simulink

腾讯云优惠券怎样领取?附最新优惠券领取教程

腾讯云优惠券是腾讯云推出的一种优惠活动&#xff0c;通常包含代金券和折扣券两种形式&#xff0c;可以在购买腾讯云产品结算时抵扣部分费用或享受特定折扣&#xff0c;帮助用户降低购买腾讯云产品的成本。 一、腾讯云优惠券类型 1、代金券&#xff1a;代金券可以在购买腾讯云…

【Docker篇】从0到1搭建自己的镜像仓库并且推送镜像到自己的仓库中

文章目录 &#x1f50e;docker私有仓库&#x1f354;具体步骤 &#x1f50e;docker私有仓库 Docker私有仓库的存在为用户提供了更高的灵活性、控制和安全性。与使用公共镜像仓库相比&#xff0c;私有仓库使用户能够完全掌握自己的镜像生命周期。 首先&#xff0c;私有仓库允许…

高性能小模型SLM最新优化方案和热门应用盘点,附配套模型和开源代码

当大多数人都还在卷谁的大模型参数规模大的时候&#xff0c;聪明人已经开始搞“小模型”了&#xff08;doge&#xff09;。 这里的小模型指的小型语言模型&#xff08;Small Language Model&#xff0c;简称SLM&#xff09;&#xff0c;通常用于解决资源受限或实时性要求较高的…

有什么常用的服务器集群?

随着互联网的快速发展&#xff0c;服务器集群已经成为了构建高可用、高性能、可扩展和安全可靠的计算环境的必备基础设施。在众多服务器集群中&#xff0c;有一些常用的集群类型&#xff0c;下面将介绍其中几种常见的服务器集群。 一、负载均衡集群 负载均衡集群通过将网络请…

车机联网

通过笔记本电脑&#xff0c;D-link给车机提供网络 因为笔记本用的无线网络上网&#xff0c;将无线网络连接设置为共享 设置后的效果 本地连接属性设置 Dlink连接电脑和车机&#xff1b;获取车机的动态ip&#xff08;动态ip每次开关机都会变化&#xff0c;注意更新&#xff09…