Kafka 入门笔记

课程地址

概述

定义

Kafka 是一个分布式的基于发布/订阅模式消息队列(MQ)

发布/订阅:消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接受感兴趣的消息

在这里插入图片描述

消息队列

消息队列应用场景:缓存/消峰、解耦、异步通信

消峰:

在这里插入图片描述

秒杀系统:10亿人发请求(数据量约为 1T)全部存入消息队列,服务端只取前 100 条数据处理,避免了服务端压力过大

解耦:
在这里插入图片描述

异步通信:

在这里插入图片描述
发布订阅模式:
在这里插入图片描述

Kafka 基础架构

在这里插入图片描述

消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响

Broker:一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成,一个 broker 可以容纳多个 topic

Topic:可以理解为一个队列,生产者和消费者面向的都是一个 Topic

Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker 上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列

Replica:副本。一个 topic 的每个分区都有若干副本,一个 Leader 和若干 Follower

Kafka 快速入门

安装部署

cd /opt/software/
wget https://downloads.apache.org/kafka/3.6.1/kafka_2.12-3.6.1.tgz
tar -zxvf kafka_2.12-3.6.1.tgz -C /opt/module

下载到 /opt/software 目录,然后解压到 /opt/module 目录,最后修改配置文件 server.properties

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

# A comma separated list of directories under which to store log files
log.dirs=/opt/module/kafka_2.12-3.6.1/datas
zookeeper.connect=u22a:2181,u22b:2181,u22c:2181

先启动 zookeeper,再启动 kafka

bin/kafka-server-start.sh -daemon ../config/server.properties
bin/kafka-server-stop.sh

集群启停脚本:

#! /bin/bash

case $1 in
"start") {
    for i in u22a u22b u22c; do
        echo "-------- start $i kafka --------"
        ssh $i "/opt/module/kafka_2.12-3.6.1/bin/kafka-server-start.sh -daemon /opt/module/kafka_2.12-3.6.1/config/server.properties"
    done
};;
"stop") {
    for i in u22a u22b u22c; do
        echo "-------- stop $i kafka --------"
        ssh $i "/opt/module/kafka_2.12-3.6.1/bin/kafka-server-stop.sh"
    done
};;
esac

kafka 命令行操作

主题命令行操作:

在这里插入图片描述

$ ./kafka-topics.sh --bootstrap-server u22b:9092 --list

$ ./kafka-topics.sh --bootstrap-server u22b:9092 --create --topic first --partitions 3 --replication-factor 2
Created topic first.

$ ./kafka-topics.sh --bootstrap-server u22b:9092 --describe --topic first
Topic: first    TopicId: nSI1J7EWQ06EbmQkLTBpYg PartitionCount: 3       ReplicationFactor: 2    Configs:
        Topic: first    Partition: 0    Leader: 2       Replicas: 2,1   Isr: 2,1
        Topic: first    Partition: 1    Leader: 1       Replicas: 1,0   Isr: 1,0
        Topic: first    Partition: 2    Leader: 0       Replicas: 0,2   Isr: 0,2
        
$ ./kafka-topics.sh --bootstrap-server u22b:9092 --delete --topic first
$ ./kafka-topics.sh --bootstrap-server u22b:9092 --alter --topic first --partitions 6

分区个数只能改大不能改小

kafka 生产者消费者命令行操作:

$ ./kafka-console-producer.sh --bootstrap-server u22a:9092 --topic first
$ ./kafka-console-consumer.sh --bootstrap-server u22a:9092 --topic first
$ ./kafka-console-consumer.sh --bootstrap-server u22a:9092 --topic first --from-beginning

分组消费:

./kafka-console-producer.sh --bootstrap-server u22a:9092 --topic first --group kafka1
./kafka-console-consumer.sh --bootstrap-server u22a:9092 --topic first --group kafka1

如果使用时主题不存在,会自动创建

Kafka 架构深入

kafka 工作流程及文件存储机制

在这里插入图片描述

一个 topic 下的每一个分区都单独维护一个 offset,所以分发到不同分区中的数据是不同的数据。消费者的分区维护的是一个消费者组一个主题的一个分区维护一个 offset

同一个消费者组能够支持断点续传:

$ ./kafka-console-consumer.sh --bootstrap-server u22a:9092 --topic first --group kafka1
$ ./kafka-console-producer.sh --bootstrap-server u22a:9092 --topic first

在这里插入图片描述
文件存储机制:

在这里插入图片描述
在这里插入图片描述

index 和 log 文件以当前 segment 的第一条消息的 offset 命名

在这里插入图片描述

index 文件存储索引信息,索引信息按照数组逻辑排列。log 文件存储数据,数据直接紧密排列,索引文件中的元数据指向对应数据文件中的 message 的物理偏移地址

Kafka 生产者

消息发送流程

在这里插入图片描述

在这里插入图片描述

相关参数:

batch.size:只有数据积累到 batch.size 之后,sender 会发送数据
linger.ms:如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据

异步发送 API

package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class CustomProducer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "u22a:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        for (int i = 0; i < 10; i++) {
        	//ProducerRecord<K, V>(totpic, value);
            producer.send(new ProducerRecord<String, String>("first", "atguigu " + i));
        }
        producer.close();
    }
}

在终端监视:

$ ./kafka-console-consumer.sh --bootstrap-server u22a:9092 --topic first --group kafka1
atguigu 0
atguigu 1
atguigu 2
atguigu 3
atguigu 4
atguigu 5
atguigu 6
atguigu 7
atguigu 8
atguigu 9

producer 在关闭之前会 flush 缓冲区

public class CustomProducer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "u22a:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 非必要参数
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("first", "atguigu " + i));
        }
        producer.close();   // flush
    }
}

生产者有回调函数的 API:

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class CustomProducerWithCallBack {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "u22a:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 非必要参数
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("first", "atguigu " + i), new Callback() {
                // 匿名子类:直接重写接口中的方法
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    // 发送消息成功,收到 ack 时调用
                    // 发送消息遇到异常,也会调用
                    if (e != null) {
                        e.printStackTrace();
                    } else {
                        System.out.println("get ack from " + recordMetadata.topic() + ": "
                                + recordMetadata.partition() + ": " + recordMetadata.offset());
                    }
                }
            });
        }
        producer.close();   // flush
    }
}

同步发送 API

send() 函数返回一个 Future 对象,直接对其调用 get() 方法即可同步调用

public class CustomProducerWithCallBackSync {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "u22a:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 非必要参数
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("first", "atguigu " + i), new Callback() {
                // 匿名子类:直接重写接口中的方法
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    // 发送消息成功,收到 ack 时调用
                    // 发送消息遇到异常,也会调用
                    if (e != null) {
                        e.printStackTrace();
                    } else {
                        System.out.println("get ack from " + recordMetadata.topic() + ": "
                                + recordMetadata.partition() + ": " + recordMetadata.offset());
                    }
                }
            }).get();
            System.out.println("send " + i);
        }
        producer.close();   // flush
    }
}

分区策略

  1. 指明 partition 的情况下,直接将指明的值作为 partition 的值
  2. 没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值
  3. 既没有 partition 值也没有 key 值的情况下,kafka 采用 Sticky Partition,随机选择一个分区,并尽可能一直使用该分区,待该分区的 batch 已满或者已完成,kafka 再随机选择一个分区使用

指定分区:

for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("first", 0, "", "atguigu " + i), new Callback() {
                // 匿名子类:直接重写接口中的方法
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    // 发送消息成功,收到 ack 时调用
                    // 发送消息遇到异常,也会调用
                    if (e != null) {
                        e.printStackTrace();
                    } else {
                        System.out.println("get ack from " + recordMetadata.topic() + ": "
                                + recordMetadata.partition() + ": " + recordMetadata.offset());
                    }
                }
            });
        }

自定义分区器

// CustomPartitioner.java
public class CustomPartitioner implements Partitioner {
    public static void main(String[] args) throws InterruptedException {
    }

    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        String s1 = o1.toString();
        if (s1.contains("atguigu")) {
            return 1;
        }
        return 0;
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> map) {
    }
}

在生产者中注册分区器即可:

// 注册使用自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atguigu.kafka.partition.CustomPartitioner");

数据可靠性

数据可靠性:ack + 全同步机制

为了保证 producer 发送的数据能可靠的发送到指定的 topic,topic 的每个 partition 收到 producer 发送的数据后,都需要向 producer 发送 ack,如果 producer 收到 ack,就会进行下一轮发送,否则重新发送

在这里插入图片描述

kafka 选用了第二种方案,虽然它受到网络延迟的影响,但是由于集群一般位于同一个局域网,网速对 kafka 的影响比较小

第二种方案带来一个问题:如果有一个 follower 单点故障,迟迟不能与 leader 进行同步,那 leader 就要一直等下去。为此,kafka 引入了 ISR:in-sync replica set

在这里插入图片描述

在不同的时间点回复 ack 会影响速度和数据可靠性,这个级别可以通过参数 acks 配置:

  • 0:partition 的 leader 接收到消息还没写入磁盘就返回 ack,当 leader 故障就会丢失数据,但是这样延迟最低
  • 1:partition 的 leader 接收到消息落盘成功后回复 ack,如果在 follower 同步成功之前 leader 故障,会丢失数据
  • -1:全部落盘成功才回复 ack。但是如果在 follower 同步完成后,broker 发送 ack 之前,leader 发生故障,那么会造成数据重复

注意 acks == 1 的情况,数据还存在原 leader 的磁盘里没有丢失,但是因为选举机制,新的 leader 无法感知原数据的存在,从整个系统来看,数据丢失了:

在这里插入图片描述

数据重复的情况:

在这里插入图片描述

在这里插入图片描述

将各自 log 文件高于 HW 的部分截掉,然后从新的 leader 同步数据

Eaxctly Once

在这里插入图片描述

Producer 事务

在这里插入图片描述

Kafka 消费者

消费方式

在这里插入图片描述

基础消费者

public class CustomConsumer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "u22a:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // 必须设置消费者组: --group kafka2
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka2");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);

        // 注册主题: --topic first
        ArrayList<String> strings = new ArrayList<>();
        strings.add("first");
        kafkaConsumer.subscribe(strings);

        while(true) {
            // 设置超时等待时长
            ConsumerRecords<String, String> res = kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> r : res) {
                System.out.println(r.toString());
            }
        }
    }
}

消费者组

同一个主题的分区,同一时刻只能有一个消费者消费

重新发送到一个全新的主题中,由于默认创建的主题分区数为 1,可以看到只有一个消费者消费到数据

分区分配策略

一个消费者组中有多个消费者,一个主题下有多个分区,所以必然会涉及到分区的分配问题,即确定哪个分区由哪个消费者消费

kafka 有 3 种分配策略:RoundRobin,Range 和 Sticky。默认使用 Range 分区器

在这里插入图片描述

更改分区分配策略:

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.RoundRobinAssignor");

在这里插入图片描述

粘性分区分配策略(StickyAssignor),首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化

offset 的维护

由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以consumer需要实时记录自己消费到了哪个 ofset,以便故障恢复后继续消费

Kafka 0.9 版本之前,consumer 默认将 offset 保存在 Zookeeper 中。从 0.9 版本开始,consumer 默认将 offset 保存在 Kafka一个内置的 topic 中,该topic为 __consumer_offsets

查看该主题:

先修改配置文件,增加配置项 exclude.internal.topics=false

./bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server u22b:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

自动提交 offset

  • enable.auto.commit:是否开启自动提交 offset
  • auto.commit.interval.ms:自动提交 offset 时间间隔
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

重置 offset

auto.offset.reset = earliest | latest | none

当 kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时

如果一直使用同一个消费者组,会触发断点续传,能够消费到之前的数据

如果使用一个新的消费者组来消费,会触发 offset 重置,相当于 from beginning

// 新的消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka3");
// 一旦使用新的消费者组,重置 offset
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

手动提交 offset

如果自动提交 offset,会在内存中拉取到数据的时候就完成 offset 的提交

ConsumerRecords<String, String> res = kafkaConsumer.poll(Duration.ofSeconds(1));

手动提交 offset 的方法有 2 种,分别是 commitSync(同步提交)和 commitAsync (异步提交)

二者的相同点是,都会将本次 poll 的一批数据最高的偏移量提交

不同点是,commitSync 会阻塞当前线程,一直到提交成功,并且失败后会自动重试

commitAsync 没有失败重试机制,故有可能提交失败

首先关闭自动提交的配置参数:

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
ConsumerRecords<String, String> res = kafkaConsumer.poll(Duration.ofSeconds(1));
 // kafkaConsumer.commitSync();     // 同步提交
 kafkaConsumer.commitAsync(new OffsetCommitCallback() {
     @Override
     public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
         if (e != null) {
             e.printStackTrace();
         } else {
             System.out.println(map);
         }
     }
 }); // 异步提交,更高效

Consumer 事务(精准一次性消费)

kafka 消费端将消费过程和提交 offset 过程做原子绑定

Kafka 高效读写数据

顺序写磁盘:写的过程是一直追加到文件末端,为顺序写

在这里插入图片描述
零拷贝技术:

在这里插入图片描述

Zookeeper 在 Kafka 中的作用

Kafka 监控

安装:

cd /opt/software
wget https://github.com/smartloli/kafka-eagle-bin/archive/v3.0.1.tar.gz
tar -zxvf v3.0.1.tar.gz
cd kafka-eagle-bin-3.0.1/
tar -axvf efak-web-3.0.1-bin.tar.gz -C /opt/module/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/383684.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【北邮鲁鹏老师计算机视觉课程笔记】04 fitting 拟合

【北邮鲁鹏老师计算机视觉课程笔记】04 fitting 拟合 1 拟合的任务 如何从边缘找出真正的线&#xff1f; 存在问题 ①噪声 ②外点、离群点 ③缺失数据 2 最小二乘 存在的问题 3 全最小二乘 度量的是点到直线的距离而不是点在y方向到直线的距离 提示&#xff1a;点到直线的…

【北邮鲁鹏老师计算机视觉课程笔记】05 Hough 霍夫变换

【北邮鲁鹏老师计算机视觉课程笔记】05 Hough 霍夫变换 1 投票策略 考虑到外点率太高 ①让直线上的每一点投票 ②希望噪声点不要给具体的任何模型投票&#xff0c;即噪声点不会有一致性的答案 ③即使被遮挡了&#xff0c;也能把直线找出来 参数空间离散化 直线相当于就是m,b两…

vue核心技术(二)

◆ 指令补充 指令修饰符 通过 "." 指明一些指令 后缀&#xff0c;不同 后缀 封装了不同的处理操作 → 简化代码 v-bind 对于样式控制的增强 为了方便开发者进行样式控制&#xff0c; Vue 扩展了 v-bind 的语法&#xff0c;可以针对 class 类名 和 style 行内样式…

网络安全检查表

《网络攻击检查表》 1.应用安全漏洞 2.弱口令&#xff0c;默认口令 3.服务器互联网暴露 4.操作系统&#xff0c;中间件安全漏洞 5.研发服务器&#xff0c;邮件服务器等安全检查

PySpark(四)PySpark SQL、Catalyst优化器、Spark SQL的执行流程、Spark新特性

目录 PySpark SQL 基础 SparkSession对象 DataFrame入门 DataFrame构建 DataFrame代码风格 DSL SQL SparkSQL Shuffle 分区数目 DataFrame数据写出 Spark UDF Catalyst优化器 Spark SQL的执行流程 Spark新特性 自适应查询(SparkSQL) 动态合并 动态调整Join策略 …

Android---Jetpack Compose学习003

Compose 状态。本文将探索如何在使用 Jetpack Compose 时使用和考虑状态&#xff0c;为此&#xff0c;我们需要构建一个 TODO 应用&#xff0c;我们将构建一个有状态界面&#xff0c;其中会显示可修改的互动式 TODO 列表。 状态的定义。在科学技术中&#xff0c;指物质系统所处…

【XR806开发板试用】轻松连上华为云实现物联网

本文为极术社区XR806试用活动文章。 一.开始 偶然的机会在网上看到了鸿蒙开发板的试用,作为一个"老鸿蒙"岂能放弃这个机会,报名之后不出意料地得到了使用名额,在此感谢极术社区. 收到开发板之后其实还有点失望了,就那么一个小小的核心板,其他啥也没有,连一根数据线…

图灵日记--MapSet字符串常量池反射枚举Lambda表达式泛型

目录 搜索树概念实现性能分析和 java 类集的关系 搜索概念及场景模型 Map的使用Map常用方法 Set的说明常见方法说明 哈希表冲突-避免-负载因子调节冲突-解决-闭散列冲突-解决-开散列/哈希桶冲突严重时的解决办法 实现和 java 类集的关系 字符串常量池String对象创建intern方法 …

MongoDB 与 mongo-express docker 安装

MongoDB 和 mongo-express 与 MySQL 不同&#xff0c;MongoDB 为 NoSQL 数据库&#xff0c;MongoDB 中没有 table &#xff0c;schema 概念&#xff0c;取而代之的 collection&#xff0c;其中 collection 存储的为 BSON 格式&#xff0c;是一种类似于 JSON 的用于存储 k-v 键…

每日五道java面试题之java基础篇(五)

第一题. final、finally、finalize 的区别&#xff1f; final ⽤于修饰变量、⽅法和类&#xff1a;final 修饰的类不可被继承&#xff1b;修饰的⽅法不可被重写&#xff1b;修饰的变量不可变。finally 作为异常处理的⼀部分&#xff0c;它只能在 try/catch 语句中&#xff0c;…

PyTorch深度学习实战(26)——多对象实例分割

PyTorch深度学习实战&#xff08;26&#xff09;——多对象实例分割 0. 前言1. 获取并准备数据2. 使用 Detectron2 训练实例分割模型3. 对新图像进行推断小结系列链接 0. 前言 我们已经学习了多种图像分割算法&#xff0c;在本节中&#xff0c;我们将学习如何使用 Detectron2 …

Netty Review - NioEventLoopGroup源码解析

文章目录 概述类继承关系源码分析小结 概述 EventLoopGroup bossGroup new NioEventLoopGroup(1); EventLoopGroup workerGroup new NioEventLoopGroup();这段代码是在使用Netty框架时常见的用法&#xff0c;用于创建两个不同的EventLoopGroup实例&#xff0c;一个用于处理连…

【计算几何】确定两条连续线段向左转还是向右转

确定两条连续线段向左转还是向右转 目录 一、说明二、算法2.1 两点的叉积2.2 两个段的叉积 三、旋转方向判别3.1 左转3.2 右转3.3 共线判别 一、说明 如果是作图&#xff0c;或者是判别小车轨迹。为了直观地了解&#xff0c;从当前点到下一个点过程中&#xff0c;什么是左转、…

树莓派4B(Raspberry Pi 4B)使用docker搭建阿里巴巴sentinel服务

树莓派4B&#xff08;Raspberry Pi 4B&#xff09;使用docker搭建阿里巴巴sentinel服务 由于国内访问不了docker hub&#xff0c;而国内镜像仓库又没有适配树莓派ARM架构的sentinel镜像&#xff0c;所以我们只能退而求其次——自己动手构建镜像。本文基于Ubuntu&#xff0c;Jav…

springboot169基于vue的工厂车间管理系统的设计

基于VUE的工厂车间管理系统设计与实现 摘 要 社会发展日新月异&#xff0c;用计算机应用实现数据管理功能已经算是很完善的了&#xff0c;但是随着移动互联网的到来&#xff0c;处理信息不再受制于地理位置的限制&#xff0c;处理信息及时高效&#xff0c;备受人们的喜爱。本…

书生谱语-大语言模型测试demo

课程内容简介 1.作业 demo1 demo2 demo3 demo4

Makefile编译原理 make 中的路径搜索_1

一.make中的路径搜索 问题&#xff1a;在实际的工程项目中&#xff0c;所有的源文件和头文件都放在同一个文件夹中吗&#xff1f; 实验1 &#xff1a; VPATH 引子 mhrubuntu:~/work/makefile1/17$ ll total 28 drwxrwxr-x 4 mhr mhr 4096 Apr 22 00:46 ./ drwxrwxr-x 7 mhr m…

《UE5_C++多人TPS完整教程》学习笔记10 ——《P11 设置加入游戏会话(Setup for Joining Sessions)》

本文为B站系列教学视频 《UE5_C多人TPS完整教程》 —— 《P11 设置加入游戏会话&#xff08;Setup for Joining Sessions&#xff09;》 的学习笔记&#xff0c;该系列教学视频为 Udemy 课程 《Unreal Engine 5 C Multiplayer Shooter》 的中文字幕翻译版&#xff0c;UP主&…

Python远程控制工具的使用

本节我们对所编写的远程控制工具的功能进行测试。首先开启主控端程序&#xff0c; 如下所示&#xff1a; 接下来打开被控端程序。当被控端打开时&#xff0c;主控端会收到被控端的连接请 求。 开启被控端程序&#xff1a; 主控端接收到连接请求并显示被控端主机的信息&#xff…

MySQL-----DCL基础操作

▶ DCL简介 DCL英文全称是Data ControlLanguage(数据控制语言)&#xff0c;用来管理数据库用户、控制数据库的访问权限。 DCL--管理用户 ▶ 查询用户 use mysql; select * from user; ▶ 创建用户 ▶ 语法 create user 用户名主机名 identified by 密码 设置为在任意主机上访问…