Kafka API与SpringBoot调用

文章目录

      • 首先需要命令行创建一个名为cities的主题,并且创建该主题的订阅者。
  • 1、使用Kafka原生API
    • 1.1、创建spring工程
    • 1.2、创建发布者
    • 1.3、对生产者的优化
    • 1.4、批量发送消息
    • 1.5、创建消费者组
    • 1.6 消费者同步手动提交
    • 1.7、消费者异步手动提交
    • 1.8、消费者同异步手动提交
  • 2、SpringBoot Kafka
    • 2.1、定义发布者
      • 1、修改配置文件
      • 2、定义发布者处理器
    • 2.2、定义消费者
      • 1、修改配置文件
      • 2、定义消费者

首先需要命令行创建一个名为cities的主题,并且创建该主题的订阅者。

在这里插入图片描述

1、使用Kafka原生API

1.1、创建spring工程

在这里插入图片描述
导入依赖:
在这里插入图片描述

1.2、创建发布者

先创建一个发布者类OneProsucer:
(注意需要配置一下ip主机名映射:添加映射)

public class OneProducer {
    // 第一个泛型:当前生产者所生产消息的key
    // 第二个泛型:当前生产者所生产的消息本身
    private KafkaProducer<Integer, String> producer;

    public OneProducer() {
        Properties properties = new Properties();
        // 指定kafka集群
        properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092,kafka03:9092");
        // 指定key与value的序列化器
        properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        this.producer = new KafkaProducer<Integer, String>(properties);
    }

    public void sendMsg() {
        // 创建消息记录(包含主题、消息本身)  (String topic, V value)
        // ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", "tianjin");
        // 创建消息记录(包含主题、key、消息本身)  (String topic, K key, V value)
        // ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", 1, "tianjin");
        // 创建消息记录(包含主题、partition、key、消息本身)  (String topic, Integer partition, K key, V value)
        ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", 1, "tianjin");
        producer.send(record);
    }
}

注意代码中的字符串kafka都是有对应的常量的,这里便于理解用原生字符串来来写。

一般情况下,我们可能无法记住这些参数名。为此,Kafka的ProducerConfig类提供了一系列的参数常量。例如:
bootstrap.servers 可替换为 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
key.serializer 可替换为 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
value.serializer 可替换为 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG

api生产的消息与命令行消息的区别:

参考:Kafka生产者

再创建一个测试类:

public class OneProducerTest {

    public static void main(String[] args) throws IOException {
        OneProducer producer = new OneProducer();
        producer.sendMsg();
        System.in.read();
    }
}

xshell启动主题为cities的一个消费者:

bin/kafka-console-consumer.sh --bootstrap-server 192.168.255.212:9092 --topic cities --from-beginning

启动生产者测试类生产消息:
在这里插入图片描述
查看linux端消费者,可以看到消息:
在这里插入图片描述
3台主机消费者都可以收到。

1.3、对生产者的优化

对于上一小节,有两个不舒服的点:

  1. 生产者端启动后控制台没有任何输出,只能通过看消费端消息才确认发送接收成功;
  2. 生产消息,指定分区的测试

这里可以使用回调方式,发送成功后,触发回调方法,生产端返回提示。

创建发布者类(修改senMsg方法):

public class TwoProducer {
    // 第一个泛型:当前生产者所生产消息的key
    // 第二个泛型:当前生产者所生产的消息本身
    private KafkaProducer<Integer, String> producer;

    public TwoProducer() {
        Properties properties = new Properties();
        // 指定kafka集群
        properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092,kafka03:9092");
        // 指定key与value的序列化器
        properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        this.producer = new KafkaProducer<Integer, String>(properties);
    }

    public void sendMsg() {
        // 创建消息记录(包含主题、消息本身)  (String topic, V value)
        // ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", "tianjin");
        // 创建消息记录(包含主题、key、消息本身)  (String topic, K key, V value)
        // ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", 1, "tianjin");
        // 创建消息记录(包含主题、partition、key、消息本身)  (String topic, Integer partition, K key, V value)
        ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", 2, 1, "tianjin");
        producer.send(record, (metadata, ex) -> {
            System.out.println("topic = " + metadata.topic());
            System.out.println("partition = " + metadata.partition());
            System.out.println("offset = " + metadata.offset());
        });
    }
}

创建测试类:

public class TwoProducerTest {

    public static void main(String[] args) throws IOException {
        TwoProducer producer = new TwoProducer();
        producer.sendMsg();
        System.in.read();
    }
}

启动运行:
在这里插入图片描述
消费端:
在这里插入图片描述
再次生产消息,偏移量变为1:
在这里插入图片描述
但是到目前为止,生产者一次只能发送一条消息,接下来看生产者批量发送消息。

1.4、批量发送消息

创建发布者类:

public class SomeProducerBatch {
    // 第一个泛型:当前生产者所生产消息的key
    // 第二个泛型:当前生产者所生产的消息本身
    private KafkaProducer<Integer, String> producer;

    public SomeProducerBatch() {
        Properties properties = new Properties();
        // 指定kafka集群
        properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092,kafka03:9092");
        // 指定key与value的序列化器
        properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 指定生产者每10条向broker发送一次
        properties.put("batch.size", 10);
        // 指定生产者每50ms向broker发送一次
        properties.put("linger.ms", 50);

        this.producer = new KafkaProducer<Integer, String>(properties);
    }

    public void sendMsg() {
        for(int i=0; i<50; i++) {
            ProducerRecord<Integer, String> record = new ProducerRecord<>("cities", "city-" + i);
            int k = i;
            producer.send(record, (metadata, ex) -> {
                System.out.println("i = " + k);
                System.out.println("topic = " + metadata.topic());
                System.out.println("partition = " + metadata.partition());
                System.out.println("offset = " + metadata.offset());
            });
        }
    }
}

注意:

  1. batch.size
  2. lingger.ms
    如果50ms没产生50条,时间到了也发消息。

创建一个测试类:

public class ProducerBatchTest {

    public static void main(String[] args) throws IOException {
        SomeProducerBatch producer = new SomeProducerBatch();
        producer.sendMsg();
        System.in.read();
    }
}

本身send方法执行了50次,但是并不是每一次都发送,仅仅是生产了50条消息;发送是按照上面的设置每10条向broker发送一次或者每50ms发送一次。
(分区是轮询的):

i = 0
topic = cities
partition = 0
offset = 2
i = 3
topic = cities
partition = 0
offset = 3
i = 1
topic = cities
partition = 2
offset = 2
i = 4
topic = cities
partition = 2
offset = 3
i = 6
topic = cities
partition = 0
offset = 4
i = 9
topic = cities
partition = 0
offset = 5
i = 7
topic = cities
partition = 2
offset = 4
i = 10
topic = cities
partition = 2
offset = 5
i = 12
topic = cities
partition = 0
offset = 6
i = 15
topic = cities
partition = 0
offset = 7
i = 13
topic = cities
partition = 2
offset = 6
i = 16
topic = cities
partition = 2
offset = 7
i = 18
topic = cities
partition = 0
offset = 8
i = 21
topic = cities
partition = 0
offset = 9
i = 24
topic = cities
partition = 0
offset = 10
i = 27
topic = cities
partition = 0
offset = 11
i = 19
topic = cities
partition = 2
offset = 8
i = 22
topic = cities
partition = 2
offset = 9
i = 30
topic = cities
partition = 0
offset = 12
i = 33
topic = cities
partition = 0
offset = 13
i = 36
topic = cities
partition = 0
offset = 14
i = 39
topic = cities
partition = 0
offset = 15
i = 42
topic = cities
partition = 0
offset = 16
i = 45
topic = cities
partition = 0
offset = 17
i = 25
topic = cities
partition = 2
offset = 10
i = 28
topic = cities
partition = 2
offset = 11
i = 31
topic = cities
partition = 2
offset = 12
i = 34
topic = cities
partition = 2
offset = 13
i = 37
topic = cities
partition = 2
offset = 14
i = 40
topic = cities
partition = 2
offset = 15
i = 43
topic = cities
partition = 2
offset = 16
i = 46
topic = cities
partition = 2
offset = 17
i = 48
topic = cities
partition = 0
offset = 18
i = 49
topic = cities
partition = 2
offset = 18
i = 2
topic = cities
partition = 1
offset = 0
i = 5
topic = cities
partition = 1
offset = 1
i = 8
topic = cities
partition = 1
offset = 2
i = 11
topic = cities
partition = 1
offset = 3
i = 14
topic = cities
partition = 1
offset = 4
i = 17
topic = cities
partition = 1
offset = 5
i = 20
topic = cities
partition = 1
offset = 6
i = 23
topic = cities
partition = 1
offset = 7
i = 26
topic = cities
partition = 1
offset = 8
i = 29
topic = cities
partition = 1
offset = 9
i = 32
topic = cities
partition = 1
offset = 10
i = 35
topic = cities
partition = 1
offset = 11
i = 38
topic = cities
partition = 1
offset = 12
i = 41
topic = cities
partition = 1
offset = 13
i = 44
topic = cities
partition = 1
offset = 14
i = 47
topic = cities
partition = 1
offset = 15

linux端:

city-1
city-4
city-7
city-10
city-0
city-3
city-6
city-9
city-13
city-16
city-19
city-22
city-25
city-28
city-31
city-34
city-37
city-40
city-12
city-15
city-18
city-21
city-24
city-27
city-30
city-33
city-36
city-39
city-42
city-45
city-43
city-46
city-49
city-48
city-2
city-5
city-8
city-11
city-14
city-17
city-20
city-23
city-26
city-29
city-32
city-35
city-38
city-41
city-44
city-47

1.5、创建消费者组

消费者类:

public class SomeConsumer extends ShutdownableThread {
    private KafkaConsumer<Integer, String> consumer;

    public SomeConsumer() {
        // 两个参数:
        // 1)指定当前消费者名称
        // 2)指定消费过程是否会被中断
        super("KafkaConsumerTest", false);

        Properties properties = new Properties();
        String brokers = "kafka01:9092,kafka02:9092,kafka03:9092";
        // 指定kafka集群
        properties.put("bootstrap.servers", brokers);
        // 指定消费者组ID
        properties.put("group.id", "cityGroup1");
        // 开启自动提交,默认为true
        properties.put("enable.auto.commit", "true");
        // 指定自动提交的超时时限,默认5s
        properties.put("auto.commit.interval.ms", "1000");
        // 指定消费者被broker认定为挂掉的时限。若broker在此时间内未收到当前消费者发送的心跳,则broker
        // 认为消费者已经挂掉。默认为10s
        properties.put("session.timeout.ms", "30000");
        // 指定两次心跳的时间间隔,默认为3s,一般不要超过session.timeout.ms的 1/3
        properties.put("heartbeat.interval.ms", "10000");
        // 当kafka中没有指定offset初值时,或指定的offset不存在时,从这里读取offset的值。其取值的意义为:
        // earliest:指定offset为第一条offset
        // latest: 指定offset为最后一条offset
        properties.put("auto.offset.reset", "earliest");
        // 指定key与value的反序列化器
        properties.put("key.deserializer",
                "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");

        this.consumer = new KafkaConsumer<Integer, String>(properties);
    }

    @Override
    public void doWork() {
        // 订阅消费主题
        consumer.subscribe(Collections.singletonList("cities"));
        // 从broker摘取消费。参数表示,若buffer中没有消费,消费者等待消费的时间。
        // 0,表示没有消息什么也不返回
        // >0,表示当时间到后仍没有消息,则返回空
        ConsumerRecords<Integer, String> records = consumer.poll(1000);
        for(ConsumerRecord record : records) {
            System.out.println("topic = " + record.topic());
            System.out.println("partition = " + record.partition());
            System.out.println("key = " + record.key());
            System.out.println("value = " + record.value());
        }
    }
}

测试类:

public class ConsumerTest {
    public static void main(String[] args) {
        SomeConsumer consumer = new SomeConsumer();
        consumer.start();
    }
}

启动运行,查看消费者控制台:

topic = cities
partition = 0
key = 1
value = tianjin
topic = cities
partition = 0
key = 1
value = tianjin
topic = cities
partition = 0
key = null
value = city-0
topic = cities
partition = 0
key = null
value = city-3
topic = cities
partition = 0
key = null
value = city-6
topic = cities
partition = 0
...

1.6 消费者同步手动提交

(1) 自动提交的问题
前面的消费者都是以自动提交 offset 的方式对 broker 中的消息进行消费的,但自动提交
可能会出现消息重复消费的情况。所以在生产环境下,很多时候需要对 offset 进行手动提交,
以解决重复消费的问题。

(2) 手动提交分类
手动提交又可以划分为同步提交、异步提交,同异步联合提交。这些提交方式仅仅是
doWork()方法不相同,其构造器是相同的。所以下面首先在前面消费者类的基础上进行构造
器的修改,然后再分别实现三种不同的提交方式。

创建创建消费者类 SyncManualConsumer

  • A、原理
    同步提交方式是,消费者向 broker 提交 offset 后等待 broker 成功响应。若没有收到响
    应,则会重新提交,直到获取到响应。而在这个等待过程中,消费者是阻塞的。其严重影响
    了消费者的吞吐量。

  • B、 修改构造器
    直接复制前面的 SomeConsumer,在其基础上进行修改。

public class SyncManualConsumer extends ShutdownableThread {
    private KafkaConsumer<Integer, String> consumer;

    public SyncManualConsumer() {
        // 两个参数:
        // 1)指定当前消费者名称
        // 2)指定消费过程是否会被中断
        super("KafkaConsumerTest", false);

        Properties properties = new Properties();
        String brokers = "kafkaOS1:9092,kafkaOS2:9092,kafkaOS3:9092";
        // 指定kafka集群
        properties.put("bootstrap.servers", brokers);
        // 指定消费者组ID
        properties.put("group.id", "cityGroup1");

        // 开启手动提交
        properties.put("enable.auto.commit", "false");
        // 指定自动提交的超时时限,默认5s
        // properties.put("auto.commit.interval.ms", "1000");
        // 指定一次提交10个offset
        properties.put("max.poll.records", 10);

        // 指定消费者被broker认定为挂掉的时限。若broker在此时间内未收到当前消费者发送的心跳,则broker
        // 认为消费者已经挂掉。默认为10s
        properties.put("session.timeout.ms", "30000");
        // 指定两次心跳的时间间隔,默认为3s,一般不要超过session.timeout.ms的 1/3
        properties.put("heartbeat.interval.ms", "10000");
        // 当kafka中没有指定offset初值时,或指定的offset不存在时,从这里读取offset的值。其取值的意义为:
        // earliest:指定offset为第一条offset
        // latest: 指定offset为最后一条offset
        properties.put("auto.offset.reset", "earliest");
        // 指定key与value的反序列化器
        properties.put("key.deserializer",
                "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");

        this.consumer = new KafkaConsumer<Integer, String>(properties);
    }

    @Override
    public void doWork() {
        // 订阅消费主题
        consumer.subscribe(Collections.singletonList("cities"));
        // 从broker摘取消费。参数表示,若buffer中没有消费,消费者等待消费的时间。
        // 0,表示没有消息什么也不返回
        // >0,表示当时间到后仍没有消息,则返回空
        ConsumerRecords<Integer, String> records = consumer.poll(1000);
        for(ConsumerRecord record : records) {
            System.out.println("topic = " + record.topic());
            System.out.println("partition = " + record.partition());
            System.out.println("key = " + record.key());
            System.out.println("value = " + record.value());
            // 手动同步提交
            consumer.commitSync();
        }
    }
}

创建测试类:

public class SyncManualTest {
    public static void main(String[] args) {
        SyncManualConsumer consumer = new SyncManualConsumer();
        consumer.start();
    }
}

1.7、消费者异步手动提交

(1) 原理
手动同步提交方式需要等待 broker 的成功响应,效率太低,影响消费者的吞吐量。异步提交方式是,消费者向 broker 提交 offset 后不用等待成功响应,所以其增加了消费者的吞吐量。

(2) 创建消费者类 AsyncManualConsumer

复制前面的 SyncManualConsumer 类,在其基础上进行修改。

public class AsynManualConsumer extends ShutdownableThread {
    private KafkaConsumer<Integer, String> consumer;

    public AsynManualConsumer() {
        ...
    }

    @Override
    public void doWork() {
        // 订阅消费主题
        consumer.subscribe(Collections.singletonList("cities"));
        // 从broker摘取消费。参数表示,若buffer中没有消费,消费者等待消费的时间。
        // 0,表示没有消息什么也不返回
        // >0,表示当时间到后仍没有消息,则返回空
        ConsumerRecords<Integer, String> records = consumer.poll(1000);
        for(ConsumerRecord record : records) {
            System.out.println("topic = " + record.topic());
            System.out.println("partition = " + record.partition());
            System.out.println("key = " + record.key());
            System.out.println("value = " + record.value());
            // 手动异步提交
            // consumer.commitAsync();
            consumer.commitAsync((offsets, ex) -> {
                if(ex != null) {
                    System.out.print("提交失败,offsets = " + offsets);
                    System.out.println(", exception = " + ex);
                }
            });
        }
    }
}

启动类:

public class AsyncManualTest {
    public static void main(String[] args) {
        AsynManualConsumer consumer = new AsynManualConsumer();
        consumer.start();
    }
}

1.8、消费者同异步手动提交

(1) 原理
同异步提交,即同步提交与异步提交组合使用。一般情况下,若偶尔出现提交失败,其
也不会影响消费者的消费。因为后续提交最终会将这次提交失败的 offset 给提交了。
但异步提交会产生重复消费,为了防止重复消费,可以将同步提交与异常提交联合使用。
(2) 创建消费者类 SyncAsyncManualConsumer
复制前面的 AsyncManualConsumer 类,在其基础上进行修改。

@Override
    public void doWork() {
        // 订阅消费主题
        consumer.subscribe(Collections.singletonList("cities"));
        // 从broker摘取消费。参数表示,若buffer中没有消费,消费者等待消费的时间。
        // 0,表示没有消息什么也不返回
        // >0,表示当时间到后仍没有消息,则返回空
        ConsumerRecords<Integer, String> records = consumer.poll(1000);
        for(ConsumerRecord record : records) {
            System.out.println("topic = " + record.topic());
            System.out.println("partition = " + record.partition());
            System.out.println("key = " + record.key());
            System.out.println("value = " + record.value());
            consumer.commitAsync((offsets, ex) -> {
                if(ex != null) {
                    System.out.print("提交失败,offsets = " + offsets);
                    System.out.println(", exception = " + ex);

                    // 同步提交
                    consumer.commitSync();
                }
            });
        }
    }

2、SpringBoot Kafka

新建一个简单案例,将发布者和订阅者定义到一个工程中。

创建一个SpringBoot工程,pom.xml添加如下依赖:

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
</dependencies>

2.1、定义发布者

Spring 是通过 KafkaTemplate 来完成对 Kafka 的操作的。

1、修改配置文件

# 自定义属性
kafka:
  topic: cities

# 配置Kafka
spring:
  kafka:
    bootstrap-servers: kafkaOS1:9092,kafkaOS2:9092,kafkaOS3:9092
    # producer:   # 配置生产者
      # key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # value-serializer: org.apache.kafka.common.serialization.StringSerializer

    consumer:   # 配置消费者
      group-id: group0  # 消费者组
      # key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

2、定义发布者处理器

@RestController
public class SomeProducer {
    @Autowired
    private KafkaTemplate<String, String> template;

    // 从配置文件读取自定义属性
    @Value("${kafka.topic}")
    private String topic;

    // 由于是提交数据,所以使用Post方式
    @PostMapping("/msg/send")
    public String sendMsg(@RequestParam("message") String message) {
        template.send(topic, message);
        return "send success";
    }
}

2.2、定义消费者

Spring 是通过监听方式实现消费者的。

1、修改配置文件

如上一小节,在配置文件中添加消费者配置内容。注意,Spring 中要求必须为消费者指定组。

2、定义消费者

Spring Kafka 是通过 KafkaListener 监听方式来完成消息订阅与接收的。当监听到有指定
主题的消息时,就会触发@KafkaListener 注解所标注的方法的执行

@Component
public class SomeConsumer {

    @KafkaListener(topics = "${kafka.topic}")
    public void onMsg(String message) {
        System.out.println("Kafka消费者接受到消息 " + message);
    }

}

run运行,postman访问接口输入消息:
在这里插入图片描述
消费者收到消息:
在这里插入图片描述
因为SpringBoot自动配置的原理,Kafka自动配置里:
在这里插入图片描述
在这里插入图片描述
默认就有了序列化,所以配置文件可以不用配置生产者的序列化。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/76046.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

优化 Linux 系统性能:探索 tuned 守护进程的调优配置文件

tuned守护进程调优系统调优配置文件从命令行配置系统调优安装、启用和启动tuned软件包tuned-adm 感谢 &#x1f496; hello大家好&#x1f60a; tuned守护进程调优系统 系统管理员可以基于多种用例工作负载来调整各种设备设置&#xff0c;以此优化系统性能。tuned 守护进程会利…

FreeRTOS(动态内存管理)

资料来源于硬件家园&#xff1a;资料汇总 - FreeRTOS实时操作系统课程(多任务管理) 目录 一、动态内存管理介绍 1、heap_1 2、heap_2 3、heap_3 4、heap_4 5、heap_5 二、动态内存总结与应用 1、heap_1 2、heap_4 3、heap_5 三、内存管理编程测试 1、heap_4 2、h…

第三方软件安全测评如何收费,安全测试包括哪些测试项?

近年来&#xff0c;随着全球范围内网络安全事件的频发&#xff0c;第三方软件安全测评的需求也日益增长。软件安全对于企业的重要性不言而喻&#xff0c;那么如何收费和可做测试项就成了企业最为关注的问题&#xff0c;小编将就以上问题作出以下简析。 一、第三方软件安全测评…

Electron基础篇

人生有些事,错过一时,就错过一世。 官网&#xff1a;简介 | Electron Electron-大多用来写桌面端软件 Electron介绍 Electront的核心组成是Chromium、Node.js以及内置的Native API&#xff0c;其中Chromium为Electron提供强大的UI能力&#xff0c;可以在不考虑兼容的情况下利…

阿里云服务器部署RabbitMQ流程

阿里云百科分享使用阿里云服务器部署RabbitMQ流程&#xff0c;RabbitMQ是实现了高级消息队列协议&#xff08;AMQP&#xff09;的开源消息代理软件&#xff0c;用于在分布式系统中存储转发消息&#xff0c;有良好的易用性、扩展性和高可用性。本文介绍如何通过ECS实例部署Rabbi…

使用python读Excel文件并写入另一个xls模版

效果如下&#xff1a; 原文件内容 转化后的内容 大致代码如下&#xff1a; 1. load_it.py #!/usr/bin/env python import re from datetime import datetime from io import BytesIO from pathlib import Path from typing import List, Unionfrom fastapi import HTTPExcep…

【开发笔记】在Python中调用Docker,并运行SDK任务

目录 1 背景2 环境准备3 实现流程3.1 连接远程Docker3.1 创建容器3.2 解压SDK3.3 挂载容器卷3.4 运行任务3.5 判断任务状态3.6 容器的停止与销毁 4 可能遇到的问题 1 背景 使用Python&#xff0c;在远程Docker中创建一个容器&#xff0c;并在该容器中运行SDK任务 2 环境准备 …

Linux系列讲解 —— FTP协议的应用

简单介绍一下FTP文件传输协议在linux系统中的应用。 目录 0. 基本概念1. FTP Server1.1 安装FTP Server1.2 FTP Server开启和关闭1.3 查看FTP Server是否开启1.4 FTP服务器配置 2. FTP Client2.1 lftp2.2 ftp2.3 sftp2.4 文件资源管理器集成的ftp和sftp 3. ftp常用命令 0. 基本…

下一代计算:嵌入AI的云/雾/边缘/量子计算

计算系统在过去几十年中推动了计算机科学的发展&#xff0c;现在已成为企业世界的核心&#xff0c;提供基于云计算、雾计算、边缘计算、无服务器计算和量子计算的服务。现代计算系统解决了现实世界中许多需要低延迟和低响应时间的问题。这有助于全球各地的青年才俊创办初创企业…

如何安装Python?

如何安装Python&#xff1f; 安装Python非常简单&#xff0c;让我们一步步来进行。 1. 访问官方网站 首先&#xff0c;您需要访问Python官方网站&#xff08;https://www.python.org/&#xff09;。在首页上&#xff0c;您会看到一个大大的「Downloads」按钮&#xff0c;点击…

分布式 - 消息队列Kafka:Kafka消费者和消费者组

文章目录 1. Kafka 消费者是什么&#xff1f;2. Kafka 消费者组的概念&#xff1f;3. Kafka 消费者和消费者组有什么关系&#xff1f;4. Kafka 多个消费者如何同时消费一个分区&#xff1f; 1. Kafka 消费者是什么&#xff1f; 消费者负责订阅Kafka中的主题&#xff0c;并且从…

Python的变量命名规则是什么?

Python的变量命名规则 在Python中&#xff0c;变量是用来存储数据的&#xff0c;而变量命名是为了方便我们理解和引用这些数据。Python的变量命名规则相对灵活&#xff0c;但也有一些基本规则和约定&#xff0c;让我们一起来了解一下。 基本规则 只能包含字母、数字和下划线&…

一种多策略下RabbitMQ的延时队列实现

1.为什么会用到延时队列? 场景: 最近在开发一款系统中遇到这样一个场景,A系统开通套餐需要把套餐信息以邮件的形式发送给相关工作人员,经过人工审核通过后,在B系统里面开通,A系统会调B系统套餐列表接口查询套餐是否开通成功,开通成功则从A系统去完成订单,假如超过设定时间未开…

Unity 实现2D地面挖洞!涂抹地形(碰撞部分,方法二)

文章目录 前言一、初始化虚拟点1.1点结构:1.2每个点有的状态:1.3生成点结构: 二、实例化边缘碰撞盒2.1计算生成边缘碰撞盒 三、涂抹部分3.1.虚拟点3.2.鼠标点3.3.内圈3.4.外圈 四、关于优化结语: 前言 老规矩先上效果图 继上一篇涂抹地形文章讲解发出后&#xff0c;有不少网友…

Docker中MySQL应用部署操作步骤

在linux系统下安装mysql、安装redis是非常麻烦的&#xff0c;但是docker出现后&#xff0c;应用安装会非常简洁。 1.MySQL部署 2.docker中部署mysql的步骤 创建mysql容器 这样mysql就部署好了。 外部机器连接docker中部署的mysql

Intel 12代酷睿集体大降价!三折太离谱了

之前有德国媒体报道称&#xff0c;Intel 12/13代酷睿以及即将发布的14代酷睿&#xff0c;将会全面涨价。 没想到&#xff0c;12代酷睿大降价了&#xff0c;幅度相当不可思议&#xff0c;不过至少目前仅限美国市场&#xff0c;新蛋、亚马逊、MicroCenter等大型零售商集体行动。 …

Qt扫盲-QTableView理论总结

QTableView理论总结 一、概述二、导航三、视觉外观四、坐标系统五、示例代码1. 性别代理2. 学生信息模型3. 对应视图 一、概述 QTableView实现了一个tableview 来显示model 中的元素。这个类用于提供之前由QTable类提供的标准表&#xff0c;但这个是使用Qt的model/view架构提供…

四张图片道清AI大模型的发展史(1943-2023)

四张图片道清AI大模型的发展史(1943-2023) 现在最火的莫过于GPT了&#xff0c;也就是大规模语言模型(LLM)。“LLM” 是 “Large Language Model”&#xff08;大语言模型&#xff09;的简称&#xff0c;通常用来指代具有巨大规模参数和复杂架构的自然语言处理模型&#xff0c;…

代码随想录算法训练营第58天|动态规划part15|392.判断子序列、115.不同的子序列

代码随想录算法训练营第58天&#xff5c;动态规划part15&#xff5c;392.判断子序列、115.不同的子序列 392.判断子序列 392.判断子序列 思路&#xff1a; &#xff08;这道题也可以用双指针的思路来实现&#xff0c;时间复杂度也是O(n)&#xff09; 这道题应该算是编辑距…

java.lang.NoClassDefFoundError: org/apache/tez/dag/api/TezConfiguration

错误&#xff1a; java.lang.NoClassDefFoundError: org/apache/tez/dag/api/TezConfigurationat org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolSession$AbstractTriggerValidator.startTriggerValidator(TezSessionPoolSession.java:74)at org.apache.hadoop.hive.ql.e…