4.3、Flink任务怎样读取Kafka中的数据

目录

1、添加pom依赖

2、API使用说明

3、这是一个完整的入门案例

4、Kafka消息应该如何解析

4.1、只获取Kafka消息的value部分

​4.2、获取完整Kafka消息(key、value、Metadata)

4.3、自定义Kafka消息解析器

5、起始消费位点应该如何设置

​5.1、earliest()

5.2、latest()

5.3、timestamp()

6、Kafka分区扩容了,该怎么办 —— 动态分区检查

7、在加载KafkaSource时提取事件时间&添加水位线

7.1、使用内置的单调递增的水位线生成器 + kafka timestamp 为事件时间

7.2、使用内置的单调递增的水位线生成器 + kafka 消息中的 ID字段 为事件时间


1、添加pom依赖

我们可以使用Flink官方提供连接Kafka的工具flink-connector-kafka

该工具实现了一个消费者FlinkKafkaConsumer,可以用它来读取kafka的数据

如果想使用这个通用的Kafka连接工具,需要引入jar依赖

<!-- 引入 kafka连接器依赖-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.17.0</version>
</dependency>

2、API使用说明

官网链接:Apache Kafka 连接器

语法说明: 

// 1.初始化 KafkaSource 实例
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)                           // 必填:指定broker连接信息 (为保证高可用,建议多指定几个节点)                     
    .setTopics("input-topic")                               // 必填:指定要消费的topic
    .setGroupId("my-group")                                 // 必填:指定消费者的groupid(不存在时会自动创建)
    .setValueOnlyDeserializer(new SimpleStringSchema())     // 必填:指定反序列化器(用来解析kafka消息数据,转换为flink数据类型)
    .setStartingOffsets(OffsetsInitializer.earliest())      // 可选:指定启动任务时的消费位点(不指定时,将默认使用 OffsetsInitializer.earliest())
    .build(); 

// 2.通过 fromSource + KafkaSource 获取 DataStreamSource
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

3、这是一个完整的入门案例

开发语言:java1.8

flink版本:flink1.17.0

public class ReadKafka {
    public static void main(String[] args) throws Exception {
        newAPI();
    }

    public static void newAPI() throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.读取kafka数据
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("worker01:9092")               // 必填:指定broker连接信息 (为保证高可用,建议多指定几个节点)
                .setTopics("20230810")                              // 必填:指定要消费的topic
                .setGroupId("FlinkConsumer")                        // 必填:指定消费者的groupid(不存在时会自动创建)
                .setValueOnlyDeserializer(new SimpleStringSchema()) // 必填:指定反序列化器(用来解析kafka消息数据)
                .setStartingOffsets(OffsetsInitializer.earliest())  // 可选:指定启动任务时的消费位点(不指定时,将默认使用 OffsetsInitializer.earliest())
                .build();

        env.fromSource(source,
                WatermarkStrategy.noWatermarks(),
                "Kafka Source")
                .print()
        ;

        // 3.触发程序执行
        env.execute();
    }
}

4、Kafka消息应该如何解析

代码中需要提供一个反序列化器(Deserializer)来对 Kafka 的消息进行解析

反序列化器的功能:

                将Kafka ConsumerRecords转换为Flink处理的数据类型(Java/Scala对象)

反序列化器通过  setDeserializer(KafkaRecordDeserializationSchema.of(反序列化器类型)) 指定

下面介绍两种常用Kafka消息解析器:

        KafkaRecordDeserializationSchema.of(new JSONKeyValueDeserializationSchema(true)) :

                 1、返回完整的Kafka消息,将JSON字符串反序列化为ObjectNode对象

                 2、可以选择是否返回Kafak消息的Metadata信息,true-返回,false-不返回

        KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class) :

                1、只返回Kafka消息中的value部分 

4.1、只获取Kafka消息的value部分

4.2、获取完整Kafka消息(key、value、Metadata)

kafak消息格式:

                key =  {"nation":"蜀国"}

                value = {"ID":整数}

    public static void ParseMessageJSONKeyValue() throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.读取kafka数据
        KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
                .setBootstrapServers("worker01:9092")               // 必填:指定broker连接信息 (为保证高可用,建议多指定几个节点)
                .setTopics("9527")                                  // 必填:指定要消费的topic
                .setGroupId("FlinkConsumer")                        // 必填:指定消费者的groupid(不存在时会自动创建)
                // 必填:指定反序列化器(将kafak消息解析为ObjectNode,json对象)
                .setDeserializer(KafkaRecordDeserializationSchema.of(
                        // includeMetadata = (true:返回Kafak元数据信息 false:不返回)
                        new JSONKeyValueDeserializationSchema(true)
                ))
                .setStartingOffsets(OffsetsInitializer.latest())  // 可选:指定启动任务时的消费位点(不指定时,将默认使用 OffsetsInitializer.earliest())
                .build();

        env
                .fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
                .print()
        ;

        // 3.触发程序执行
        env.execute();

    }

运行结果:    

常见报错: 

Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = 9527, partition = 0, leaderEpoch = 0, offset = 1064, CreateTime = 1691668775938, serialized key size = 4, serialized value size = 9, headers = RecordHeaders(headers = [], isReadOnly = false), key = [B@5e9eaab8, value = [B@67390400).
	at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:57)
	at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
	... 14 more
Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'xxxx': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"xxxx"; line: 1, column: 5]

报错原因:

          出现这个报错,一般是使用flink读取fafka时,使用JSONKeyValueDeserializationSchema

来解析消息时,kafka消息中的key 或者 value 内容不符合json格式而造成的解析错误

例如下面这个格式,就会造成解析错误  key=1000,value=你好

那应该怎么解决呢?

        1、如果有权限修改Kafka消息格式,可以将Kafka消息key&value内容修改为Json格式

        2、如果没有权限修改Kafka消息格式(比如线上环境,修改比较困难),可以重新实现

       JSONKeyValueDeserializationSchema类,根据所需格式来解析Kafka消息(可以参考源码)

4.3、自定义Kafka消息解析器

        生产中对Kafka消息及解析的格式总是各种各样的,当flink预定义的解析器满足不了业务需求时,可以通过自定义kafka消息解析器来完成业务的支持

例如,当使用 MyJSONKeyValueDeserializationSchema 获取Kafka元数据时,只返回了 offset、topic、partition 三个字段信息,现在需要`kafka生产者写入数据时的timestamp`,就可以通过自定义kafka消息解析器来完成

代码示例:

// TODO 自定义Kafka消息解析器,在 metadata 中增加 timestamp字段
public class MyJSONKeyValueDeserializationSchema implements KafkaDeserializationSchema<ObjectNode>{

        private static final long serialVersionUID = 1509391548173891955L;

        private final boolean includeMetadata;
        private ObjectMapper mapper;

        public MyJSONKeyValueDeserializationSchema(boolean includeMetadata) {
            this.includeMetadata = includeMetadata;
        }

        @Override
        public void open(DeserializationSchema.InitializationContext context) throws Exception {
            mapper = JacksonMapperFactory.createObjectMapper();
        }

        @Override
        public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
            ObjectNode node = mapper.createObjectNode();
            if (record.key() != null) {
                node.set("key", mapper.readValue(record.key(), JsonNode.class));
            }
            if (record.value() != null) {
                node.set("value", mapper.readValue(record.value(), JsonNode.class));
            }
            if (includeMetadata) {
                node.putObject("metadata")
                        .put("offset", record.offset())
                        .put("topic", record.topic())
                        .put("partition", record.partition())
                        // 添加 timestamp 字段
                        .put("timestamp",record.timestamp())
                ;
            }
            return node;
        }

        @Override
        public boolean isEndOfStream(ObjectNode nextElement) {
            return false;
        }

        @Override
        public TypeInformation<ObjectNode> getProducedType() {
            return getForClass(ObjectNode.class);
        }
    }

运行结果:


5、起始消费位点应该如何设置

起始消费位点说明:

        起始消费位点是指 启动flink任务时,应该从哪个位置开始读取Kafka的消息   

        下面介绍下常用的三个设置:    

                OffsetsInitializer.earliest()  :

                        从最早位点开始消

                        这里的最早指的是Kafka消息保存的时长(默认为7天,生成环境各公司略有不同)

                        该这设置为默认设置,当不指定OffsetsInitializer.xxx时,默认为earliest() 

                OffsetsInitializer.latest()   :

                        从最末尾位点开始消费

                        这里的最末尾指的是flink任务启动时间点之后生产的消息

                OffsetsInitializer.timestamp(时间戳) :

                        从时间戳大于等于指定时间戳(毫秒)的数据开始消费

下面用案例说明下,三种设置的效果,kafak生成10条数据,如下:

5.1、earliest()

代码示例:

KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
        .setBootstrapServers("worker01:9092")
        .setTopics("23230811")
        .setGroupId("FlinkConsumer")
        // 将kafka消息解析为Json对象,并返回元数据
        .setDeserializer(KafkaRecordDeserializationSchema.of(
                new JSONKeyValueDeserializationSchema(true)
        ))
        // 设置起始消费位点:从最早位置开始消费(该设置为默认设置)
        .setStartingOffsets(OffsetsInitializer.earliest())
        .build();

运行结果:

5.2、latest()

代码示例:

KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
        .setBootstrapServers("worker01:9092")
        .setTopics("23230811")
        .setGroupId("FlinkConsumer")
        // 将kafka消息解析为Json对象,并返回元数据
        .setDeserializer(KafkaRecordDeserializationSchema.of(
                new JSONKeyValueDeserializationSchema(true)
        ))
        // 设置起始消费位点:从最末尾位点开始消费
        .setStartingOffsets(OffsetsInitializer.latest())
        .build();

运行结果:

5.3、timestamp()

代码示例:

KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
        .setBootstrapServers("worker01:9092")
        .setTopics("23230811")
        .setGroupId("FlinkConsumer")
        // 将kafka消息解析为Json对象,并返回元数据
        .setDeserializer(KafkaRecordDeserializationSchema.of(
                new MyJSONKeyValueDeserializationSchema(true)
        ))
        // 设置起始消费位点:从指定时间戳后开始消费
        .setStartingOffsets(OffsetsInitializer.timestamp(1691722791273L))
        .build();

运行结果:


6、Kafka分区扩容了,该怎么办 —— 动态分区检查

        在flink1.13的时候,如果Kafka分区扩容了,只有通过重启flink任务,才能消费到新增分区的数据,小编就曾遇到过上游业务部门的kafka分区扩容了,并没有通知下游使用方,导致实时指标异常,甚至丢失了数据。

        在flink1.17的时候,可以通过`开启动态分区检查`,来实现不用重启flink任务,就能消费到新增分区的数据

开启分区检查:(默认不开启)

KafkaSource.builder()
    .setProperty("partition.discovery.interval.ms", "10000"); // 每 10 秒检查一次新分区

代码示例:

KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
        .setBootstrapServers("worker01:9092")
        .setTopics("9527")
        .setGroupId("FlinkConsumer")
        // 将kafka消息解析为Json对象,并返回元数据
        .setDeserializer(KafkaRecordDeserializationSchema.of(
                new JSONKeyValueDeserializationSchema(true)
        ))
        // 设置起始消费位点:从最末尾位点开始消费
        .setStartingOffsets(OffsetsInitializer.latest())
        // 开启动态分区检查(默认不开启)
        .setProperty("partition.discovery.interval.ms", "10000") // 每 10 秒检查一次新分区
        .build();

7、在加载KafkaSource时提取事件时间&添加水位线

可以在 fromSource(source,WatermarkStrategy,sourceName) 时,提取事件时间和制定水位线生成策略

注意:当不指定事件时间提取器时,Kafka Source 使用 Kafka 消息中的时间戳作为事件时间

7.1、使用内置的单调递增的水位线生成器 + kafka timestamp 为事件时间

代码示例:

    // 在读取Kafka消息时,提取事件时间&插入水位线
    public static void KafkaSourceExtractEventtimeAndWatermark() throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.读取kafka数据
        KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
                .setBootstrapServers("worker01:9092")
                .setTopics("9527")
                .setGroupId("FlinkConsumer")
                // 将kafka消息解析为Json对象,并返回元数据
                .setDeserializer(KafkaRecordDeserializationSchema.of(
                        new MyJSONKeyValueDeserializationSchema(true)
                ))
                // 设置起始消费位点:从最末尾位点开始消费
                .setStartingOffsets(OffsetsInitializer.latest())
                .build();

        env.fromSource(source,
                        // 使用内置的单调递增的水位线生成器(默认使用 kafka的timestamp作为事件时间)
                        WatermarkStrategy.forMonotonousTimestamps(),
                        "Kafka Source")
                // 通过 ProcessFunction 查看提取的事件时间和水位线信息
                .process(
                        new ProcessFunction<ObjectNode, String>() {
                            @Override
                            public void processElement(ObjectNode kafkaJson, ProcessFunction<ObjectNode, String>.Context ctx, Collector<String> out) throws Exception {
                                // 当前处理时间
                                long currentProcessingTime = ctx.timerService().currentProcessingTime();
                                // 当前水位线
                                long currentWatermark = ctx.timerService().currentWatermark();
                                StringBuffer record = new StringBuffer();
                                record.append("========================================\n");
                                record.append(kafkaJson + "\n");
                                record.append("currentProcessingTime:" + currentProcessingTime + "\n");
                                record.append("currentWatermark:" + currentWatermark + "\n");
                                record.append("kafka-ID:" + Long.parseLong(kafkaJson.get("value").get("ID").toString()) + "\n");
                                record.append("kafka-timestamp:" + Long.parseLong(kafkaJson.get("metadata").get("timestamp").toString()) + "\n");
                                out.collect(record.toString());

                            }
                        }
                ).print();

        // 3.触发程序执行
        env.execute();
    }

运行结果:

7.2、使用内置的单调递增的水位线生成器 + kafka 消息中的 ID字段 为事件时间

代码示例:

    // 在读取Kafka消息时,提取事件时间&插入水位线
    public static void KafkaSourceExtractEventtimeAndWatermark() throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.读取kafka数据
        KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
                .setBootstrapServers("worker01:9092")
                .setTopics("9527")
                .setGroupId("FlinkConsumer")
                // 将kafka消息解析为Json对象,并返回元数据
                .setDeserializer(KafkaRecordDeserializationSchema.of(
                        new MyJSONKeyValueDeserializationSchema(true)
                ))
                // 设置起始消费位点:从最末尾位点开始消费
                .setStartingOffsets(OffsetsInitializer.latest())
                .build();

        env.fromSource(source,
                        // 使用内置的单调递增的水位线生成器(使用 kafka消息中的ID字段作为事件时间)
                        WatermarkStrategy.<ObjectNode>forMonotonousTimestamps()
                                // 提取 Kafka消息中的 ID字段作为 事件时间
                                .withTimestampAssigner(
                                        (json, timestamp) -> Long.parseLong(json.get("value").get("ID").toString())
                                ),

                        "Kafka Source")
                // 通过 ProcessFunction 查看提取的事件时间和水位线信息
                .process(
                        new ProcessFunction<ObjectNode, String>() {
                            @Override
                            public void processElement(ObjectNode kafkaJson, ProcessFunction<ObjectNode, String>.Context ctx, Collector<String> out) throws Exception {
                                // 当前处理时间
                                long currentProcessingTime = ctx.timerService().currentProcessingTime();
                                // 当前水位线
                                long currentWatermark = ctx.timerService().currentWatermark();
                                StringBuffer record = new StringBuffer();
                                record.append("========================================\n");
                                record.append(kafkaJson + "\n");
                                record.append("currentProcessingTime:" + currentProcessingTime + "\n");
                                record.append("currentWatermark:" + currentWatermark + "\n");
                                record.append("kafka-ID:" + Long.parseLong(kafkaJson.get("value").get("ID").toString()) + "\n");
                                record.append("kafka-timestamp:" + Long.parseLong(kafkaJson.get("metadata").get("timestamp").toString()) + "\n");
                                out.collect(record.toString());

                            }
                        }
                ).print();

        // 3.触发程序执行
        env.execute();
    }

运行结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/74406.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【腾讯云 TDSQL-C Serverless 产品体验】基于TDSQL-C 存储爬取的QQ音乐歌单数据

【腾讯云 TDSQL-C Serverless 产品体验】基于TDSQL-C 存储爬取的QQ音乐歌单数据 文章目录 【腾讯云 TDSQL-C Serverless 产品体验】基于TDSQL-C 存储爬取的QQ音乐歌单数据前言出现的背景一、TDSQL-C数据库是什么&#xff1f;二、TDSQL-C 的特点三、TDSQL-C的应用场景四、基于TD…

培训报名小程序-用户注册

目录 1 创建数据源2 注册用户3 判断用户是否注册4 完整代码总结 我们的培训报名小程序&#xff0c;用户每次打开时都需要填写个人信息才可以报名&#xff0c;如果用户多次报名课程&#xff0c;每次都需要填写个人信息&#xff0c;比较麻烦。 本篇我们就优化一下功能&#xff0c…

研发工程师玩转Kubernetes——启动、存活和就绪探针

启动&#xff08;Startup Probe&#xff09;、存活&#xff08;Liveness Probe&#xff09;和就绪探针(Readiness Probe)有其不同的用途和优先级。 优先级和用途 启动探针&#xff08;Startup Probe&#xff09;用于Pod内程序告诉kubernetes&#xff0c;其准备工作已经做好。…

【Servlet】(Servlet API HttpServlet 处理请求 HttpServletRequest 打印请求信息 前端给后端传参)

文章目录 Servlet APIHttpServlet处理请求 HttpServletRequest打印请求信息前端给后端传参 Servlet API Servlet中常用的API HttpServlet 实际开发的时候主要重写 doXXX 方法, 很少会重写 init / destory / service destory 服务器终止的时候会调用. //下面的注解把当前类和…

urllib爬虫模块

urllib爬取数据 import urllib.request as request# 定义url url "https://www.baidu.com" #模拟浏览器发起请求获取响应对象 response request.urlopen(url)""" read方法返回的是字节形式的二进制数据 二进制--》字符串 解码 decode( 编码的格式…

部署lawyer-llama

Git - Downloading PackageGit - Downloading PackageGit - Downloading Package 下载git&#xff0c;wget需要下载一下 &#xff08;GNU Wget 1.21.4 for Windows&#xff09;&#xff0c; Windows中git bash完全可以替代原生的cmd&#xff0c;但是对于git bash会有一些Linu…

一个基于SpringBoot+Vue前后端分离高校心理健康系统详细设计实现

博主介绍&#xff1a;✌全网粉丝30W,csdn特邀作者、博客专家、CSDN新星计划导师、Java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专…

仓储二代拣货标签接口

1.批量拣货更新标签信息接口 http://localhost/smartpick/associate/lightTags 代码形式&#xff1a; { url : http://localhost/smartpick/associate/lightTags, requestMethed : POST, requestParameter : { [ {"mac":"99.99.22.03","devty…

Docker中部署redis

1.部署redis要求 2.部署教程 连接容器中的redis redis部署完毕

Oracle-如何判断字符串包含中文字符串(汉字),删除中文内容及保留中文内容

今天遇见一个问题需要将字段中包含中文字符串的筛选出来 --建表 CREATE TABLE HADOOP1.AAA ( ID VARCHAR2(255) ); --添加字段INSERT INTO HADOOP1.AAA(ID)VALUES(理解);....--查询表内容SELECT * FROM HADOOP1.AAA;在网上查找了一下有以下三种方式&#xff1a; 第一种&#…

开源数据库Mysql_DBA运维实战 (DCL/日志)

SQL&#xff08;Structured Query Language 即结构化查询语言&#xff09; a.DDL语句 数据库定义语言&#xff1a; 数据库&#xff0c;表&#xff0c;视图&#xff0c;索引&#xff0c;存储过程&#xff0c;函数&#xff0c;创建删除ALTER(CREATE DROP ALTER) b.DML语句 数…

在Excel中将数值差距极大的两个序列用对比明显的折线图表示

在Excel中&#xff0c;如果两个数据序列的数值差距太大&#xff0c;用这样的数据序列生成折线图时&#xff0c;折线图会显得过于平缓&#xff0c;趋势对比不明显。如下图&#xff1a; 这时候只要将趋势图设置成双坐标轴&#xff0c;将其中一条趋势线绘制到次坐标轴上&#xff0…

java毕业设计-智慧食堂管理系统-内容快览

首页 智慧食堂管理系统是一种可以提高食堂运营效率的管理系统。它将前端代码使用Vue实现&#xff0c;后端使用Spring Boot实现。这个系统的目的是简化食堂管理&#xff0c;提高食堂服务质量。在现代快节奏的生活中&#xff0c;人们对餐饮服务提出了更高的要求&#xff0c;食堂管…

【Quarkus技术系列】「云原生架构体系」在云原生时代下的Java“拯救者”是Quarkus,那云原生是什么呢?

云原生时代下的Java"拯救者" 在云原生时代&#xff0c;其实Java程序是有很大的劣势的&#xff0c;以最流行的spring boot/spring cloud微服务框架为例&#xff0c;启动一个已经优化好&#xff0c;很多bean需要lazy load的application至少需要3-4秒时间&#xff0c;内…

C语言快速回顾(二)

前言 在Android音视频开发中&#xff0c;网上知识点过于零碎&#xff0c;自学起来难度非常大&#xff0c;不过音视频大牛Jhuster提出了《Android 音视频从入门到提高 - 任务列表》&#xff0c;结合我自己的工作学习经历&#xff0c;我准备写一个音视频系列blog。C/C是音视频必…

【Github】SourceTree技巧汇总

sourceTree登录github账户 会跳转到浏览器端 按照Git Flow 初始化仓库分支 克隆远程仓库到本地 推送变更到远程仓库 合并分支 可以看到目前的本地分支&#xff08;main、iOS_JS&#xff09;和远程分支&#xff08;origin/main、origin/HEAD、origin/iOS_JS&#xff09;目前所处…

C++多线程场景中的变量提前释放导致栈内存异常

多线程场景中的栈内存异常 在子线程中尝试使用当前函数的资源&#xff0c;是非常危险的&#xff0c;但是C支持这么做。因此C这么做可能会造成栈内存异常。 正常代码 #include <iostream> #include <thread> #include <windows.h>// 线程函数&#xff0c;用…

消防态势标绘工具,为消防基层工作助力

背景介绍 无人机测绘技术在消防领域的应用越来越普及&#xff0c;高清的二维正射影像和倾斜摄影实景三维模型能为消防态势标绘提供高质量的素材&#xff0c;消防队急需一个简便易用的、能够基于这些二三维的高清地图成果进行态势标绘的工具软件&#xff0c;使得消防“六熟悉”…

Rust 重载运算符|复数结构的“加减乘除”四则运算

复数 基本概念 复数定义 由实数部分和虚数部分所组成的数&#xff0c;形如a&#xff0b;bi 。 其中a、b为实数&#xff0c;i 为“虚数单位”&#xff0c;i -1&#xff0c;即虚数单位的平方等于-1。 a、b分别叫做复数a&#xff0b;bi的实部和虚部。 当b0时&#xff0c;a&…

(二)结构型模式:2、桥接模式(Bridge Pattern)(C++实现示例)

目录 1、桥接模式&#xff08;Bridge Pattern&#xff09;含义 2、桥接模式应用场景 3、桥接模式的UML图学习 4、C实现桥接模式的示例 1、桥接模式&#xff08;Bridge Pattern&#xff09;含义 桥接模式是一种结构型设计模式&#xff0c;它将抽象部分与实现部分分离&#…