# Kafka_深入探秘者(2):kafka 生产者

Kafka_深入探秘者(2):kafka 生产者

一、kafka 消息发送流程解析

1、kafka :java 客户端 数据生产流程解析

在这里插入图片描述

二、kafka 发送类型

1、kafka 发送类型–发送即忘记:producer.send(record) 同步发送


//通过 send() 发送完消息后返回一个 Future 对象,然后调用 Future 对象的 get() 方法等待 kafka 响应
//如果 kafka 正常响应,返回一个 Recordetadata 对象,该对象存储消息的偏移量
//如果 kafka 发生错误,无法正常响应,就会抛出异常,我们便可以进行异常处理 producer.send(record).get();

2、在 kafka_learn 工程中,修改 生产者 ProducerFastStart.java 类 ,获取发送类型。


/**
 *  kafka_learn\src\main\java\djh\it\kafka\learn\chapter1\ProducerFastStart.java
 *
 *  2024-6-21 创建 生产者 ProducerFastStart.java 类
 */
package djh.it.kafka.learn.chapter1;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
//注意导包,一定要导成 kafka 的序列化包
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.Future;

public class ProducerFastStart {

    //private static final String brokerList = "localhost:9092";
    private static final String brokerList = "172.18.30.110:9092";

    private static final String topic = "heima";

    public static void main( String[] args ) {

        Properties properties = new Properties();
        //1)设置 key 序列化器 -- 优化代码
        //properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //2)设置重试次数 -- 优化代码
        properties.put(ProducerConfig.RETRIES_CONFIG, 10);

        //3)设置值序列化器 -- 优化代码
        //properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //4)设置集群地址 -- 优化代码
        //properties.put("bootstrap.servers", brokerList);
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String,String> record = new ProducerRecord<>(topic, "kafka-demo", "kafka发送类型(同步发送)-2024-6-21-kafka-test!");
        try{
//            producer.send(record);
            //发送类型--同步发送
            Future<RecordMetadata> send = producer.send(record);
            RecordMetadata recordMetadata = send.get();
            System.out.println("topic: " + recordMetadata.topic());
            System.out.println("partition: " + recordMetadata.partition());
            System.out.println("offset: " + recordMetadata.offset());
        }catch (Exception e){
            e.printStackTrace();
        }
        producer.close();
    }
}

在这里插入图片描述

3、kafka 发送类型–异步发送,相当于重新启动一个线程发送消息。


//发送类型--异步发送
producer.send(record, new Callback() {
	public void onCompletion(RecordMetadata metadata, Exception exception) {
		if (exception == null) {
			System.out.println("topic: " + metadata.topic());
			System.out.println("partition: " + metadata.partition());
			System.out.println("offset: " + metadata.offset());
		}
	}
});

4、在 kafka_learn 工程中,修改 生产者 ProducerFastStart.java 类 ,进行异步发送。


/**
 *  kafka_learn\src\main\java\djh\it\kafka\learn\chapter1\ProducerFastStart.java
 *
 *  2024-6-21 创建 生产者 ProducerFastStart.java 类
 */
package djh.it.kafka.learn.chapter1;

import org.apache.kafka.clients.producer.*;
//注意导包,一定要导成 kafka 的序列化包
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.Future;

public class ProducerFastStart {

    //private static final String brokerList = "localhost:9092";
    private static final String brokerList = "172.18.30.110:9092";

    private static final String topic = "heima";

    public static void main( String[] args ) {

        Properties properties = new Properties();
        //1)设置 key 序列化器 -- 优化代码
        //properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //2)设置重试次数 -- 优化代码
        properties.put(ProducerConfig.RETRIES_CONFIG, 10);

        //3)设置值序列化器 -- 优化代码
        //properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //4)设置集群地址 -- 优化代码
        //properties.put("bootstrap.servers", brokerList);
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String,String> record = new ProducerRecord<>(topic, "kafka-demo", "kafka异步发送-2024-6-21-kafka-test!");
        try{
//            producer.send(record);
//            //发送类型--同步发送
//            Future<RecordMetadata> send = producer.send(record);
//            RecordMetadata recordMetadata = send.get();
//            System.out.println("topic: " + recordMetadata.topic());
//            System.out.println("partition: " + recordMetadata.partition());
//            System.out.println("offset: " + recordMetadata.offset());

            //发送类型--异步发送
            producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception == null) {
                        System.out.println("topic: " + metadata.topic());
                        System.out.println("partition: " + metadata.partition());
                        System.out.println("offset: " + metadata.offset());
                    }
                }
            });
        }catch (Exception e){
            e.printStackTrace();
        }
        producer.close();
    }
}

在这里插入图片描述

三、kafka 序列化器

1、序列化器

消息要到网络上进行传输,必须进行序列化,而序列化器的作用就是如此。

Kafka 提供了默认的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer),
还有整型(IntegerSerializer)和字节数组(BytesSerializer)序列化器,

这些序列化器都实现了接口(org.apache.kafka.common.serialization.Serializer)
基本上能够满足大部分场景的需求。

2、在 kafka_learn 工程中,创建 Company.java 对象类


/**
 *  kafka_learn\src\main\java\djh\it\kafka\learn\chapter2\Company.java
 *
 *  2024-6-21 创建 Company.java 对象类
 */
package djh.it.kafka.learn.chapter2;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Company {
    private String name;
    private String address;

    public String getName() {
        return name;
    }

    public void setName( String name ) {
        this.name = name;
    }
    public String getAddress(){
        return address;
    }

    public void setAddress( String address ) {
        this.address = address;
    }
}

3、自定义序列化器

见代码库: com.heima.kafka.chapter2.companySerializer

在 kafka_learn 工程中,自定义序列化器 companySerializer.java


/**
 *  kafka_learn\src\main\java\djh\it\kafka\learn\chapter2\companySerializer.java
 *
 *  2024-6-21 自定义序列化器 companySerializer.java
 *  主要处理针对 自定义类
 */
package djh.it.kafka.learn.chapter2;

import org.apache.kafka.common.serialization.Serializer;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;

public class companySerializer implements Serializer<Company> {
    @Override
    public void configure( Map<String, ?> configs, boolean isKey ) {

    }

    @Override
    public byte[] serialize( String topic, Company data ) {
        if(data == null){
            return null;
        }
        byte[] name, address;
        try{
            if(data.getName() != null){
                name = data.getName().getBytes("UTF-8");

            }else{
                name = new byte[0];
            }
            if(data.getAddress() != null){
                address = data.getAddress().getBytes("UTF-8");
            }else {
                address = new byte[0];
            }
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + name.length + address.length);
            buffer.putInt(name.length);
            buffer.put(name);
            buffer.putInt(address.length);
            buffer.put(address);
            return buffer.array();
        }catch (UnsupportedEncodingException e){
            e.printStackTrace();
        }
        return new byte[0];
    }

    @Override
    public void close() {

    }
}

四、kafka 分区器

1、kafka 分区器

本身 kafka 有自己的分区策略的,如果未指定,就会使用默认的分区策略 Kafka 根据传递消息的 key 来进行分区的分配,即 hash(key)%numPartitions。如果 Key 相同的话,那么就会分配到统一分区。

2、源代码 org.apache.kafka.clients.producer.internals.DefaultPartitioner.java 分析


/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.clients.producer.internals;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.utils.Utils;

import java.util.Map;

/**
 * The default partitioning strategy:
 * <ul>
 * <li>If a partition is specified in the record, use it
 * <li>If no partition is specified but a key is present choose a partition based on a hash of the key
 * <li>If no partition or key is present choose the sticky partition that changes when the batch is full.
 * 
 * See KIP-480 for details about sticky partitioning.
 */
public class DefaultPartitioner implements Partitioner {

    private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();

    public void configure(Map<String, ?> configs) {}

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
    }

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param numPartitions The number of partitions of the given {@code topic}
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
                         int numPartitions) {
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        }
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    public void close() {}
    
    /**
     * If a batch completed for the current sticky partition, change the sticky partition. 
     * Alternately, if no sticky partition has been determined, set one.
     */
    public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
    }
}

3、也可以自定义分区器,在 kafka_learn 工程中,创建 自定义分区器 DefinePartitioner.java


/**
 *  kafka_learn\src\main\java\djh\it\kafka\learn\chapter2\DefinePartitioner.java
 *
 *  2024-6-21 创建 自定义分区器 DefinePartitioner.java
 */
package djh.it.kafka.learn.chapter2;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

public class DefinePartitioner implements Partitioner {

    private final AtomicInteger counter = new AtomicInteger(0);

    @Override
    public int partition( String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster ) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (null == keyBytes){
            return counter.getAndIncrement() % numPartitions;
        } else {
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void onNewBatch( String topic, Cluster cluster, int prevPartition ) {

    }

    @Override
    public void configure( Map<String, ?> map ) {

    }
}

五、kafka 拦截器

1、kafka 拦截器

Producer 拦截器 (interceptor) 是个相当新的功能,它和 consumer 端 interceptor 是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定制化控制逻辑。

2、生产者拦截器可以用在消息发送前做一些准备工作。

使用场景

  • 1、按照某个规则过滤掉不符合要求的消,息
  • 2、修改消息的内容
  • 3、统计类需求

3、源码 kafka 拦截器接口类 ProducerInterceptor.java


package org.apache.kafka.clients.producer;

import org.apache.kafka.common.Configurable;

public interface ProducerInterceptor<K, V> extends Configurable {

    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

    public void onAcknowledgement(RecordMetadata metadata, Exception exception);

    public void close();
}

4、在 kafka_learn 工程中,创建 自定义拦截器 ProducerinterceptorPrefix.java


/**
 *  kafka_learn\src\main\java\djh\it\kafka\learn\chapter2\ProducerInterceptorPrefix.java
 *
 *  2024-6-21 自定义拦截器 ProducerInterceptorPrefix.java
 */
package djh.it.kafka.learn.chapter2;


import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class ProducerInterceptorPrefix implements ProducerInterceptor<String, String> {

    private volatile long sendSuccess = 0;
    private volatile long sendFailure = 0;

    @Override
    public ProducerRecord<String, String> onSend( ProducerRecord<String, String> record ) {
        String modifiedValue = "prefix1-" + record.value();
        return new ProducerRecord<>(record.topic(),
                record.partition(), record.timestamp(),
                record.key(), modifiedValue, record.headers());
    }

    @Override
    public void onAcknowledgement( RecordMetadata metadata, Exception e ) {
        if(e == null){
            sendSuccess++;
        } else {
            sendFailure++;
        }
    }

    @Override
    public void close() {
        double successRatio = (double) sendSuccess / (sendFailure + sendSuccess);
        System.out.println("[INFO] 发送成功率=" + String.format("%f", successRatio * 100) + "%");
    }

    @Override
    public void configure( Map<String, ?> configs ) {

    }
}

六、kafka 发送原理剖析总结

1、kafka 发送原理剖析

kafka 发送原理剖析.png

消息发送的过程中,涉及到两个线程协同工作,主线程首先将业务数据封装成 ProducerRecord 对象,之后调用 send0 方法将消息放入 RecordAccumulator (消息收集器,也,可以理解为主线程与 Sender 线程直接的缓冲区)中暂存,Sender 线程负青将消息信息构成请求,并最终执行网络 I/0 的线程,它从 RecordAccumulator 中取出消息并批量发送出去,需要注意的是,KafkaProducer 是线程安全的,多个线程间可以共享使用同一个 KafkaProducer 对象。

2、其他生产者参数:

2.1 acks

这个参数用来指定分区中必须有多少个副本收到这条消息,之后生产者才会认为这条消息时写入成功的。acks 是生产者客户端中非常重要的一个参数,它涉及到消息的可靠性和吞吐量之间的权衡。

  • ack=0,生产者在成功写入消息之前不会等待任何来自服务器的相应。如果出现问题生产者是感知不到的,消息就丢失了。不过因为生产者不需要等待服务器响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。

  • ack=1,默认值为 1,只要集群的首领节点收到消息,生产这就会收到一个来自服务器的成功响应。如果消息无法达到首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。但是,这样还有可能会导致数据丢失,如果收到写成功通知,此时首领节点还没来的及同步数据到 follower 节点,首领节点崩溃,就会导致数据丢失。

  • ack=-1, 只有当所有参与复制的节点都收到消息时,生产这会收到一个来自服务器的成功响应,这种模式是最安全的,它可以保证不止一个服务器收到消息。

注意: acks 参数配置的是一个字符串类型,而不是整数类型,如果配置为整数类型会抛出以下异常。

2.2 retries

生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,如果达到了 retires 设置的次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待 100ms,可以通过 retry.backoff.ms 参数来修改这个时间间隔。

2.3 batch.size

当有多个消,息要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算,而不是消息个数。当批次被填满,批次里的所有消,息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消,息的批次也可能被发送。所以就算把 batch.size 设置的很大,也不会造成延迟,只会占用更多的内存而已,如果设置的太小,生产者会因为频繁发送消息而增加一些额外的开销。

2.4 max.request.size

该参数用于控制生产者发送的请求大小,它可以指定能发送的单个消息的最大值,也可以指单个请求里所有消息的总大小。broker 对可接收的消息最大值也有自己的限制 (message.max.size),所以两边的配置最好匹配,避免生产者发送的消息被 broker 拒绝。

3、在 kafka_learn 工程中,创建 KafkaProducerAnalysis.java 类,添加 自定义分区器、自定义拦截器 进行发送测试。


/**
 *  kafka_learn\src\main\java\djh\it\kafka\learn\chapter2\KafkaProducerAnalysis.java
 *
 *  2024-6-21 创建 生产者 KafkaProducerAnalysis.java 类
 *
 *  自定义分区器、自定义拦截器 分析,进行发送测试
 */
package djh.it.kafka.learn.chapter2;

//注意导包,一定要导成 kafka 的序列化包

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class KafkaProducerAnalysis {

    //private static final String brokerList = "localhost:9092";
    private static final String brokerList = "172.18.30.110:9092";

    private static final String topic = "heima";

    public static void main( String[] args ) throws InterruptedException{

        Properties props = initNevConfig();
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

//        ProducerRecord<String,String> record = new ProducerRecord<>(topic, "kafka-demo-001", "kafka-自定义-分区DefinePartitioner");
        ProducerRecord<String,String> record = new ProducerRecord<>(topic, "kafka-demo-001", "kafka-自定义拦截器ProducerInterceptorPrefix使用");
        try{
            //1、发送消息
            producer.send(record);
        }catch (Exception e){
            e.printStackTrace();
        }
        producer.close();
    }

    private static Properties initNevConfig() {
        Properties properties = new Properties();
        //1)设置 key 序列化器 -- 优化代码
        //properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //2)设置重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 10);

        //3)设置值序列化器 -- 优化代码
        //properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //4)设置集群地址 -- 优化代码
        //properties.put("bootstrap.servers", brokerList);
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");

        //自定义分区器使用
//        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,DefinePartitioner.class.getName());

        //自定义拦截器使用
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorPrefix.class.getName());

        //其他参数:acks 使用:
//        properties.put(ProducerConfig.ACKS_CONFIG, 0);  //error, 必须是字符串类型
//        properties.put(ProducerConfig.ACKS_CONFIG, "0");  //ok, 必须是字符串类型

        return properties;
    }
}

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

上一节关联链接请点击
# Kafka_深入探秘者(1):初识 kafka

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/735625.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Vue3组件】分享一下自己写的简约风格评论区组件

代码比较简单&#xff0c;方便大家二次开发&#xff0c;旨在快速提供基础的样式模板&#xff0c;自行迭代定制 预览 简介 通用评论组件 组件功能 此组件旨在创建一个具备嵌套回复能力的通用评论区域&#xff0c;适用于构建动态、互动性强的用户讨论场景。 接收数据结构 组件通…

Paimon Trino Presto的关系 分布式查询引擎

Paimon支持的引擎兼容性矩阵&#xff1a; Trino 是 Presto 同项目的不同版本&#xff0c;是原Faceboo Presto创始人团队核心开发和维护人员分离出来后开发和维护的分支&#xff0c;Trino基于Presto&#xff0c;目前 Trino 和 Presto 都仍在继续开发和维护。 Trino 生态系统-客…

虚拟机IP地址频繁变化的解决方法

勾八动态分配IP&#xff0c;让我在学习redis集群的时候&#xff0c;配置很多的IP地址&#xff0c;但是由于以下原因导致我IP频繁变动&#xff0c;报错让我烦恼&#xff01;&#xff01;&#xff01;&#xff01; 为什么虚拟机的IP地址会频繁变化&#xff1f; 虚拟机IP地址频繁…

webpack处理js资源10--webpack入门学习

处理 js 资源 有人可能会问&#xff0c;js 资源 Webpack 不能已经处理了吗&#xff0c;为什么我们还要处理呢&#xff1f; 原因是 Webpack 对 js 处理是有限的&#xff0c;只能编译 js 中 ES 模块化语法&#xff0c;不能编译其他语法&#xff0c;导致 js 不能在 IE 等浏览器运…

Zabbix+Garafana监控部署

ZabbixGarafana监控部署 一、IP规划 服务器IP备注zabbix-server192.168.100.128zabbix服务端zabbix-mysql192.168.100.130数据库zabbix-client192.168.100.132zabbix客户端garafana-server192.168.100.134Garafana 二、zabbix-server安装zabbix ​ 配置IP地址为&#xff1a…

俄语打招呼和问候的12种表达方式,柯桥俄语培训

- Как дела ? 近况如何&#xff1f; -Нормально, а ты как? 还行吧&#xff0c;你呢&#xff1f; Vol.2 -Как себя чувствуете? 你感觉如何&#xff1f; -Все замечательно! 一切都非常棒。 Vol.3 -Ка…

基于matlab的图像灰度化与图像反白

1原理 2.1 图像灰度化原理 图像灰度化是将彩色图像转换为灰度图像的过程&#xff0c;使得每个像素点仅包含一个灰度值&#xff0c;从而简化了图像的复杂度。灰度化原理主要可以分为以下几种方法&#xff1a; 亮度平均法 原理&#xff1a;将图像中每个像素的RGB值的平均值作为…

Vue40 修改默认配置

修改默认配置 在官网查看各个属性的作用 ### 在vue.config.js文件中&#xff0c;修改属性的值

计算机网络 静态路由及动态路由RIP

一、理论知识 1.静态路由 静态路由是由网络管理员手动配置在路由器上的固定路由路径。其优点是简单和对网络拓扑变化不敏感&#xff0c;缺点是维护复杂、易出错&#xff0c;且无法自动适应网络变化。 2.动态路由协议RIP RIP是一种基于距离向量的动态路由协议。它使用跳数作…

接口自动化拓展:Flask框架安装、介绍及工作中的应用!

Flask是一个轻量级的Python Web框架&#xff0c;用于构建Web应用程序和API。它简洁而灵活&#xff0c;容易上手&#xff0c;并且非常适合用于开发小型到中型规模的应用程序。在接口自动化测试中&#xff0c;Flask可以作为服务器框架&#xff0c;用于搭建测试接口。 本文将从零…

使用USI作为主SPI接口

代码; lcd_drive.c //***************************************************************************** // // File........: LCD_driver.c // // Author(s)...: ATMEL Norway // // Target(s)...: ATmega169 // // Compiler....: AVR-GCC 3.3.1; avr-libc 1.0 // // D…

UltraEditUEStudio软件安装包下载及安装教程

​根据软件大数据显示提供预定义的或使用者创建的编辑“环境”&#xff0c;能记住 UltraEdit 的所有可停靠窗口、工具栏等的状态。实际上我们可以这样讲HTML 工具栏&#xff0c;对常用的 HTML 功能作了预配置;文件加密/解密;多字节和集成的 IME。根据使用者情况表明Git Editor&…

day41--Redis(三)高级篇之最佳实践

Redis高级篇之最佳实践 今日内容 Redis键值设计批处理优化服务端优化集群最佳实践 1、Redis键值设计 1.1、优雅的key结构 Redis的Key虽然可以自定义&#xff0c;但最好遵循下面的几个最佳实践约定&#xff1a; 遵循基本格式&#xff1a;[业务名称]:[数据名]:[id]长度不超过…

【Linux系列】find命令使用与用法详解

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

动手学深度学习(Pytorch版)代码实践 -卷积神经网络-26网络中的网络NiN

26网络中的网络NiN import torch from torch import nn import liliPytorch as lp import matplotlib.pyplot as plt# 定义一个NiN块 def nin_block(in_channels, out_channels, kernel_size, strides, padding):return nn.Sequential(# 传统的卷积层nn.Conv2d(in_channels, ou…

兰州理工大学24计算机考研情况,好多专业都接受调剂,只有计算机专硕不接收调剂,复试线为283分!

兰州理工大学&#xff08;Lanzhou University of Technology&#xff09;&#xff0c;位于甘肃省兰州市&#xff0c;是甘肃省人民政府、教育部、国家国防科技工业局共建高校&#xff0c;甘肃省高水平大学和“一流学科”建设高校&#xff1b;入选国家“中西部高校基础能力建设工…

年薪50w+的项目经理,手把手教你如何复盘

复盘是一种重要的学习和改进工具&#xff0c;对于项目经理来说&#xff0c;能帮助识别项目中的成功与失败&#xff0c;为未来的项目管理提供宝贵经验。 理论部分 定义目标。在开始复盘之前&#xff0c;明确复盘的目标是什么。是为了找出项目中的问题并提出解决方案&#xff0c…

Open MMLab 之 MMDetection3D框架

MMDetection框架入门教程&#xff08;完全版&#xff09;-CSDN博客 OpenMMLab MMDetection是商汤和港中文大学针对目标检测任务推出的一个开源项目&#xff0c;它基于Pytorch实现了大量的目标检测算法&#xff0c;把数据集构建、模型搭建、训练策略等过程都封装成了一个个模块…

Chromium 调试指南2024 Mac篇 - 准备工作 (一)

1.引言 Chromium是一个由Google主导开发的开源浏览器项目&#xff0c;它为Google Chrome浏览器提供了基础框架。Chromium不仅是研究和开发现代浏览器技术的重要平台&#xff0c;还为众多其他基于Chromium的浏览器&#xff08;如Microsoft Edge、Brave等&#xff09;提供了基础…

基于Openmv的色块识别代码及注意事项

在给出代码之前我先说注意事项以及需要用到的函数 1、白平衡和自动增益的关闭 打开白平衡和自动增益会影响颜色识别的效果&#xff0c;具体影响体现在可能使你颜色阈值发生改变 关闭代码如下 sensor.set_auto_gain(False) #关闭自动增益 sensor.set_whitebal(False) …