【2024】kafka streams的详细使用与案例练习(2)

目录

  • 前言
  • 使用
    • 1、整体结构
      • 1.1、序列化
    • 2、 Kafka Streams 常用的 API
      • 2.1、 StreamsBuilder
      • 2.2、 KStream 和 KTable
      • 2.3、 filter和 filterNot
      • 2.4、 map 和 mapValues
      • 2.5、 flatMap 和 flatMapValues
      • 2.6、 groupByKey 和 groupBy
      • 2.7、 count、reduce 和 aggregate
      • 2.8、 join 和 leftJoin
      • 2.9、 to 和 toTable
      • 2.10、 foreach
    • 3、简单案例练习
      • 3.1、通过streams实现数据流处理,把字符串装为大写
      • 3.2、map 使用,把内容转为key,把value转为内容长度
      • 3.3、filter和 filterNot使用过滤消息
      • 3.4、通过selectKey配置key
    • 4、复杂对象案例练习
      • 4.1、自定义序列化
      • 4.2、发送消息
      • 4.3、通过KafkaStreams接收复杂对象,并且使用split

前言

上一篇文章已经对kafka streams进行了大致的介绍以及简单案例使用,这一片文章主要是对kafka streams常用API的介绍以及使用,如果需要看概念介绍的可以看
Kafka Streams详细介绍与具体使用

下面直接开始进行kafka streams使用操作

使用

1、整体结构

在我们整体处理streams时,总共就分为三部分

  1. 第一部分是创建配置,告诉kafka我们的连接信息,通过StreamsConfig传递,然后创建一个StreamsBuilder类用于构建kafka streams拓扑的主要类。该类主要用于定义数据流处理的拓扑结构。
    还有就是通过Serde进行一个序列化

  2. 第二部分主要是构建流处理拓扑,通过StreamsBuilder类得到源Processor处理器也就是KStream类,然后通过KStream的API方法得到不同的Processor处理器(重点

  3. 第三部分通过配置流处理拓扑,创建流处理实例,然后通过kafkaStreams.start()方法启动,结束时再通过kafkaStreams.close()关闭

我们在使用时,1和3基本上都是相同的,一般我们要处理的就是2,根据不同的业务需求,我们得到不同的Processor处理器,然后发送到不同的topic

1.1、序列化

在使用Kafka Streams时,我们需要把从topic接收到的消息进行序列化,然后再把返回topic的消息进行反序列化,
可以和kafka发送一样使用Properties类直接定义全部的,但这样会有很大的局限性,因为我们发送给不同的处理过的topic消息往往都会是不同的类型,所以我们会使用到Serde进行该操作,再每次和topic进行读取或写入消息时通过Serde进行指定key和消息的类型。
基础数据类型都有默认的写好的通过Serdes方法可以获得,如果需要定义复杂的则需要自己定义一个序列化和反序列化的类。
如下:

  • Serde<String> stringSerde = Serdes.String():String类型
  • Serde<Purchase> propertiesSerde = Serdes.serdeFrom(serializer, deserializer):自定义复杂类型
    • JsonSerializer<Purchase> serializer = new JsonSerializer<>(); :自定义的序列化类
    • JsonDeserializer<Purchase> deserializer = new JsonDeserializer<>(Purchase.class);:反序列化类

kafka也有默认的可以直接用的:org.springframework.kafka.support.serializer.JsonSerializer,但一般建议自定义序列化

2、 Kafka Streams 常用的 API

2.1、 StreamsBuilder

StreamsBuilder 是构建 Kafka Streams 拓扑的主要类。它用于定义数据流处理的拓扑结构。

StreamsBuilder builder = new StreamsBuilder();

2.2、 KStream 和 KTable

  • KStream:表示一个无界的数据流,每个记录都是一个键值对。
KStream<String, String> stream = builder.stream("input-topic");
  • KTable:表示一个表,每个键对应一个最新的值。
KTable<String, Long> table = builder.table("input-topic");

2.3、 filter和 filterNot

  • filter:根据条件过滤记录。
KStream<String, String> filteredStream = stream.filter((key, value) -> value.contains("important"));
  • filterNot:过滤掉不符合条件的记录。
KStream<String, String> filteredStream = stream.filterNot((key, value) -> value.contains("unimportant"));

2.4、 map 和 mapValues

  • map:对每个记录的键和值进行转换。
KStream<String, Integer> mappedStream = stream.map((key, value) -> new KeyValue<>(key, value.length()));
  • mapValues:仅对记录的值进行转换。
KStream<String, Integer> mappedStream = stream.mapValues(value -> value.length());

2.5、 flatMap 和 flatMapValues

  • flatMap:将每个输入记录映射为零个或多个输出记录。
KStream<String, String> flatMappedStream = stream.flatMap((key, value) -> {
      List<KeyValue<String, String>> result = new ArrayList<>();
      for (String word : value.split(" ")) {
          result.add(new KeyValue<>(key, word));
      }
      return result;
  });
  • flatMapValues:仅将每个输入记录的值映射为零个或多个输出值。

****KStream<String, String> flatMappedStream = stream.flatMapValues(value -> Arrays.asList(value.split(" ")));

2.6、 groupByKey 和 groupBy

  • groupByKey:根据记录的键进行分组。
KGroupedStream<String, String> groupedStream = stream.groupByKey();
  • groupBy:根据新的键进行分组。
KGroupedStream<String, String> groupedStream = stream.groupBy((key, value) -> value);

2.7、 count、reduce 和 aggregate

  • count:计算每个分组中的记录数。
KTable<String, Long> countTable = groupedStream.count();
  • reduce:对每个分组中的记录进行归约操作。
KTable<String, String> reducedTable = groupedStream.reduce((aggValue, newValue) -> aggValue + newValue);
  • aggregate:自定义聚合操作。
  KTable<String, Integer> aggregatedTable = groupedStream.aggregate(
      () -> 0,
      (key, value, aggregate) -> aggregate + value.length(),
      Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("aggregated-store")
  );

2.8、 join 和 leftJoin

  • join:对两个流或表进行内连接操作。
  KStream<String, String> joinedStream = stream.join(
      otherStream,
      (value1, value2) -> value1 + value2,
      JoinWindows.of(Duration.ofMinutes(5))
  );
  • leftJoin:对两个流或表进行左连接操作。
  KStream<String, String> leftJoinedStream = stream.leftJoin(
      otherStream,
      (value1, value2) -> value1 + (value2 == null ? "" : value2),
      JoinWindows.of(Duration.ofMinutes(5))
  );

2.9、 to 和 toTable

  • to:将流中的记录写入到Kafka主题。
**stream.to("output-topic");**
  • toTable:将流转换为表。
  KTable<String, String> table = stream.toTable();

2.10、 foreach

对每个记录执行一个操作,但不返回新的流。

stream.foreach((key, value) -> System.out.println(key + ": " + value));

上面这些就是KafkaStreams常用到的一些API,通过组合使用这些API,你可以构建复杂的流处理拓扑,以满足各种数据处理需求。

3、简单案例练习

使用脚本

#先进入容器
docker exec -it kafka-server /bin/bash

#创建topic(把ip换为自己的)
/opt/kafka/bin/kafka-topics.sh --create --topic sell.purchase.transaction --bootstrap-server localhost:9092 --partitions 2 --replication-factor 1

# 进入生产者发消息
/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic input-topic


#进入消费者监听
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic sell.purchase.transaction

3.1、通过streams实现数据流处理,把字符串装为大写

@Slf4j
public class KafkaStreamsYellingApp {
//    appid
    private final static String APPLICATION_ID = "yelling_app_id";
    private final static String INPUT_TOPIC = "input-topic";
    private final static String OUTPUT_TOPIC = "out-topic";
    private final static String BOOTSTRAP_SERVERS = "localhost:9092";


    public static void main(String[] args) throws InterruptedException {

        //1、配置客户端和序列化器
//        配置kafka stream属性连接
        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        StreamsConfig streamsConfig = new StreamsConfig(properties);
//        配置键值对的序列化/反序列化Serdes对象
        Serde<String> stringSerde = Serdes.String();
//        构建流处理拓扑(用于输出)
        StreamsBuilder builder = new StreamsBuilder();

        //2、构建流处理拓扑
//        数据源处理器:从指定的topic中取出数据
        KStream<String, String> inputStream = builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, stringSerde));
//
        KStream<String, String> upperStream = inputStream
                .peek((key, value) -> {
                    log.info("[收集]key:{},value:{}", key, value);
                })
                .filter((key, value) -> value.length() > 5)
                .mapValues(time -> time.toUpperCase())
                .peek((key, value) -> log.info("[过滤结束]key:{},value:{}", key, value));
//        日志打印upperStream处理器的数据
        upperStream.print(Printed.toSysOut());
//        把upperStream处理器的数据输出到指定的topic中
        upperStream.to(OUTPUT_TOPIC, Produced.with(stringSerde, stringSerde));

        //3、通过配置流处理拓扑,创建流处理实例
        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);
//        jvm关闭时,把流也关闭
        CountDownLatch downLatch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            kafkaStreams.close();
            downLatch.countDown();
            log.info("关闭流处理");
        }));

        kafkaStreams.start();
        log.info("启动执行!");
    }
}


在这里插入图片描述

3.2、map 使用,把内容转为key,把value转为内容长度


@Slf4j
public class KafkaStreamsYellingApp2 {

    private final static String APPLICATION_ID = "yelling_app_id";
    private final static String INPUT_TOPIC = "input-topic";

    private final static String OUTPUT_TOPIC = "output-topic";

    private final static String BOOTSTRAP_SERVERS = "localhost:9092";



    public static void main(String[] args) throws InterruptedException {

//        配置kafka stream属性连接
        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        StreamsConfig streamsConfig = new StreamsConfig(properties);
//        配置键值对的序列化/反序列化Serdes对象
        Serde<String> stringSerde = Serdes.String();
        Serde<Integer> integerSerde = Serdes.Integer();
//        构建流处理拓扑(用于输出)
        StreamsBuilder builder = new StreamsBuilder();
//        数据源处理器:从指定的topic中取出数据
        KStream<String, String> inputStream = builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, stringSerde));
//

        KStream<String, Integer> k1 = inputStream.map((noKey, value) -> KeyValue.pair(value, value.length()));

        k1.print(Printed.<String,Integer>toSysOut().withLabel("map"));

//        k1.to(OUTPUT_TOPIC, Produced.with(stringSerde, integerSerde));

        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);


//        jvm关闭时,把流也关闭
        CountDownLatch downLatch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            kafkaStreams.close();
            downLatch.countDown();
            log.info("关闭流处理");
        }));

        kafkaStreams.start();
        log.info("启动执行!");

    }
    
}

3.3、filter和 filterNot使用过滤消息

/**
 * filter和 filterNot使用
 */
@Slf4j
public class KafkaStreamsYellingApp3 {

    private final static String APPLICATION_ID = "yelling_app_id";
    private final static String INPUT_TOPIC = "input-topic";

    private final static String OUTPUT_TOPIC = "output-topic";

    private final static String BOOTSTRAP_SERVERS = "localhost:9092";



    public static void main(String[] args) throws InterruptedException {

//        配置kafka stream属性连接
        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        StreamsConfig streamsConfig = new StreamsConfig(properties);
//        配置键值对的序列化/反序列化Serdes对象
        Serde<String> stringSerde = Serdes.String();
        Serde<Integer> integerSerde = Serdes.Integer();
//        构建流处理拓扑(用于输出)
        StreamsBuilder builder = new StreamsBuilder();
//        数据源处理器:从指定的topic中取出数据
        KStream<String, String> inputStream = builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, stringSerde));
//
        inputStream.filter((key, value) -> (value.contains("kafka")), Named.as("filtering-processor"))
                        .print(Printed.<String,String>toSysOut().withLabel("filtering"));


        inputStream.filterNot((key, value) -> (value.contains("kafka")), Named.as("filtering-not-processor"))
                .print(Printed.<String,String>toSysOut().withLabel("filtering-not"));



        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);


//        jvm关闭时,把流也关闭
        CountDownLatch downLatch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            kafkaStreams.close();
            downLatch.countDown();
            log.info("关闭流处理");
        }));

        kafkaStreams.start();
        log.info("启动执行!");
    }
}

3.4、通过selectKey配置key

@Slf4j
public class KafkaStreamsYellingApp4 {


    private final static String APPLICATION_ID = "yelling_app_id";
    private final static String INPUT_TOPIC = "input-topic";

    private final static String OUTPUT_TOPIC = "output-topic";

    private final static String BOOTSTRAP_SERVERS = "localhost:9092";



    public static void main(String[] args) throws InterruptedException {

//        配置kafka stream属性连接
        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        StreamsConfig streamsConfig = new StreamsConfig(properties);
//        配置键值对的序列化/反序列化Serdes对象
        Serde<String> stringSerde = Serdes.String();
        Serde<Integer> integerSerde = Serdes.Integer();
//        构建流处理拓扑(用于输出)
        StreamsBuilder builder = new StreamsBuilder();
//        数据源处理器:从指定的topic中取出数据
        KStream<String, String> inputStream = builder.stream(INPUT_TOPIC, Consumed.with(stringSerde, stringSerde));
//

         inputStream.flatMap( //把一个消息的内容通过转为多条消息以map的方式返回,
                (key, value) -> Arrays.stream(value.split(" ")) //通过使用java的stream把内容转为map
                        .map(e -> KeyValue.pair(e, e.length())).collect(Collectors.toList()))
                        .print(Printed.<String,Integer>toSysOut().withLabel("flatMap"));


        inputStream.flatMapValues( //不改变key,直接转换value
                value -> Arrays.stream(value.split(" "))
                        .map(String::toUpperCase).toList())
                .print(Printed.<String,String>toSysOut().withLabel("flatMapValues"));


//        配置key
        inputStream.selectKey((key, value) -> value)
                .print(Printed.<String,String>toSysOut().withLabel("selectKey"));



        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);


//        jvm关闭时,把流也关闭
        CountDownLatch downLatch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            kafkaStreams.close();
            downLatch.countDown();
            log.info("关闭流处理");
        }));

        kafkaStreams.start();
        log.info("启动执行!");

    }

}

4、复杂对象案例练习

4.1、自定义序列化

对接收topic的消息到拓扑还是发送消息到topic都需要进行序列化和反序列化

  • 序列化
public class JsonSerializer<T> implements Serializer<T> {

    private Gson gson=  new Gson();



    public void configure(Map<String ,?> map, boolean b) {
    }



    public byte[] serialize(String topic, T t) {


        return gson.toJson(t).getBytes();
    }



    @Override
    public void close() {
        Serializer.super.close();
    }
}
  • 反序列化
/**
 * 反序列化
 * @param <T>
 */
public class JsonDeserializer<T> implements Deserializer<T> {
    private Gson gson=  new Gson();

    private Class<T> deserializeClass;
    public JsonDeserializer(Class<T> deserializeClass){
        this.deserializeClass=deserializeClass;
    }

    public JsonDeserializer(){

    }

    @Override
    @SuppressWarnings("unchecked")
    public void configure(Map<String,?> map, boolean b){
        if (deserializeClass == null){
            deserializeClass = (Class<T>) map.get("serializedClass");
        }
    }

    @Override
    public T deserialize(String topic, byte[] data) {
        if (data == null){
            return null;
        }
        return gson.fromJson(new String(data),deserializeClass);
    }


    @Override
    public void close() {

    }
}

4.2、发送消息

/**
 * 生产消息到kafka
 */
@Slf4j
public class MyProducer {
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private final static String TOPIC_NAME = "production-topic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
//        设置参数
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
//        设置序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

//        连接客户端
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        for (int i = 0; i < 5; i++) {
            Purchase purchase = new Purchase();
            purchase.setId("12431234253");
            purchase.setDate(new Date().toString());
            purchase.setName("苹果");
            purchase.setPrice(100.0+i);
            String json = new JSONObject(purchase).toString();

            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, 0,"my-keyValue3", json);

//        同步
            send(producer,producerRecord);
        }

    }

    /**
     * @param producer: 客户端对象
     * @return void
     * 同步发送
     * @date 2024/3/22 17:09
     */
    private static void send(KafkaProducer<String, String> producer,ProducerRecord<String, String> producerRecord) throws InterruptedException, ExecutionException {

//          等待发送成功的阻塞方法
        RecordMetadata metadata = producer.send(producerRecord).get();

        log.info("同步发送消息"+ "topic-"+metadata.topic()+"====partition:"+metadata.partition()
                +"=====offset:"+metadata.offset());
    }
    
}

4.3、通过KafkaStreams接收复杂对象,并且使用split

split使用介绍

  • BranchedKStream<K, V> split(final Named named):split方法一个参数,该参数主要是用来定义分裂后的分支的一个统一前缀。会返回一个 BranchedKStream对象。

  • BranchedKStream:主要用来使用分裂校验的

    • branch(Predicate<? super K, ? super V> predicate, Branched<K, V> branched):会接收两个参数,前面的是Predicate用做校验判断作为判断条件的,后面的是一个Branched对象,用作处理满足该分支条件拆分出来的消息进行处理
    • defaultBranch(Branched<K, V> branched):作为结尾方法用作对不满足前面的全部branch条件的消息,进行一个最后处理
    • noDefaultBranch():结尾方法,表示对接下来的消息不做处理
  • Predicate:判断验证条件,满足该条件的消息会被拆分出来到当前分支

  • Branched:对分裂到当前分支的消息进行处理

    • as(final String name) :给该分支添加一个名字,不做处理。
    • withFunction():对该分支的消息进行处理,然后会把处理后的消息返回到结果的map中去
    • withConsumer():对该分支的消息进行处理,不会把消息返回回去

在这里插入图片描述

在这里插入图片描述

  • 具体代码

在测试使用前需要把几个topic先创建出来

@Slf4j
public class ZMartKafkaStreamsApp {

    private final static String APPLICATION_ID = "ZMart_app_id";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private final static String TOPIC_NAME = "production-topic";

    private final static String APPLE_TOPIC_NAME = "apple-topic";

    private final static String WATERMELON_TOPIC_NAME = "watermelon-topic";
    private final static String OUT_TOPIC_NAME = "out-topic";

    public static void main(String[] args) throws InterruptedException {

        StreamsConfig streamsConfig = new StreamsConfig(getProperties());
        JsonSerializer<Purchase> serializer = new JsonSerializer<>();

        JsonDeserializer<Purchase> deserializer = new JsonDeserializer<>(Purchase.class);

        Serde<Purchase> propertiesSerde = Serdes.serdeFrom(serializer, deserializer);
        Serde<String> stringSerde = Serdes.String();

        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, Purchase> inputStream = builder.stream(TOPIC_NAME, Consumed.with(stringSerde, propertiesSerde));



//        谓词条件 判断把流输送到哪个分支上
        Predicate<String, Purchase> isWatermelon = (key, value) -> {
            String name = value.getName();
            return name.equals("西瓜");
        };


//          split:分裂流
        Map<String, KStream<String, Purchase>> stringKStreamMap = inputStream
                .peek((k,v)-> log.info("[分裂消息===>] k:{},value:{}" ,k,v))
                .split(Named.as("split-"))//拼接的key前缀
                .branch(isWatermelon, Branched.withFunction(ks -> {  //对满足isWatermelon条件的分支的消息进行处理的处理器
//                    ks.print(Printed.<String, Purchase>toSysOut().withLabel("西瓜分支"));
                    return  ks.mapValues(v -> {
                        String modifiedNumber = v.getId().replaceAll("(?<=\\d)\\d(?=\\d)", "*");
                        v.setId( modifiedNumber);
                        return v;
                    });
                },"watermelon")) //这个地方需要指定name,用作为返回的map的key 如该分支存放到返回的map的key为:split-watermelon
                .branch((key, value) -> value.getName().equals("苹果"),
                        Branched.withConsumer( //不满足上面的条件的会继续向下匹配,满足条件直接发送
                            ks -> ks
                                .peek((k, v) -> log.info("[苹果分支] value:{}" ,v))
                                .to(APPLE_TOPIC_NAME, Produced.with(stringSerde, propertiesSerde)),"apple"))
                .defaultBranch(Branched.withConsumer( //把不满足前面所以branch条件的消息发送到默认主题
                        ks -> ks.to(OUT_TOPIC_NAME, Produced.with(stringSerde, propertiesSerde)),"defaultBranch"));



//   把上面的消息打印出来
        stringKStreamMap.forEach((s, kStream) ->
                kStream.print(Printed.<String, Purchase>toSysOut().withLabel(s)));


        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfig);
//        jvm关闭时,把流也关闭
        CountDownLatch latch = new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            kafkaStreams.close();
            latch.countDown();
            log.info("The Kafka Streams 执行关闭!");
        }));

        kafkaStreams.start();
        log.info("kafka streams 启动成功!>>>>");
        latch.await();


    }


    @NotNull
    private static Properties getProperties() {
        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
        return properties;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/722604.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

深圳比创达|EMI电磁干扰行业:从挑战到机遇的蜕变

在当今科技日新月异的时代&#xff0c;电磁干扰&#xff08;EMI&#xff09;已成为影响电子设备性能和稳定性的重要因素。EMI电磁干扰行业因此应运而生&#xff0c;致力于研究和解决电磁干扰问题&#xff0c;确保电子设备的正常运行。 一、EMI电磁干扰行业面临的挑战 随着电子…

Java学习 (一) 环境安装及入门程序

一、安装java环境 1、获取软件包 https://www.oracle.com/java/technologies/downloads/ .exe 文件一路装过去就行&#xff0c;最好别装c盘 &#xff0c;我这里演示的时候是云主机只有C盘 2、配置环境变量 我的电脑--右键属性--高级系统设置--环境变量 在环境变量中添加如下配…

Pycharm的基础使用

Pycharm的基础使用 一、修改主题 第一步&#xff1a;点击file->settings 第二步&#xff1a;找到Appearance&Behavior->Appearance->Theme选择主题 有五种主题可以选 二、修改默认字体和大小 第一步&#xff1a;打开设置与上面修改主题第一步一样&#xff1b…

可以把 FolkMQ 内嵌到 SpringBoot3 项目里(可内嵌的消息中间件)

之前发了《把 FolkMQ 内嵌到 SpringBoot2 项目里&#xff08;比如 “诺依” 啊&#xff09;》。有人说都淘态了&#xff0c;有什么好内嵌的。。。所以再发个 SpringBoot3 FolkMQ 是一个 “纯血国产” 的消息中间件。支持内嵌、单机、集群、多重集群等多种部署方式。 内嵌版&am…

【学习】程序员资源网站

1 书栈网 简介&#xff1a;书栈网是程序员互联网IT开源编程书籍、资源免费阅读的网站&#xff0c;在书栈网你可以找到很多书籍、笔记资源。在这里&#xff0c;你可以根据热门收藏和阅读查看大家都在看什么&#xff0c;也可以根据技术栈分类找到对应模块的编程资源&#xff0c;…

elasticsearch hanlp插件远程词典配置

elasticsearch hanlp插件远程词典配置 背景远程词典配置新增远程词典文件修改hanlp-remote.xml自动加载词典 远程词典测试 背景 在使用elasticsearch的过程中&#xff0c;总会遇到与分词相关的需求&#xff0c;这里将针对常用的elasticsearch hanlp&#xff08;后面统称为 es …

uniapp app一键登录

一键登录不需要单独写页面&#xff0c;uniapp 有原生的页面 第一步&#xff0c;登录Dcloud后台》我的应用》点击应用名称 填写完点击 uniCloud模块新建一个服务空间》选择免费 , 创建完点击一键登录&#xff0c;添加应用&#xff0c;这个需要审核&#xff0c;“大概一天左右”…

做动画?Animatediff 和 ComfyUI 更配哦!

如果从工作流和内存利用率的角度来说&#xff0c;Animatediff 和 ComfyUI 可能更配一些&#xff0c;毕竟制作动画是一个很吃内存的操作。 首先&#xff0c;我们需要在管理器中下载 Animatediff 插件&#xff0c;当然也可以直接导入听雨的工作流&#xff0c;然后在管理器的安装…

欢迎 Stable Diffusion 3 加入 Diffusers

作为 Stability AI 的 Stable Diffusion 家族最新的模型&#xff0c;Stable Diffusion 3(SD3) 现已登陆 Hugging Face Hub&#xff0c;并且可用在 &#x1f9e8; Diffusers 中使用了。 Stable Diffusion 3https://stability.ai/news/stable-diffusion-3-research-paper 当前放出…

Folx Mac版软件下载-Folx 2024最新版-下载工具附加详细安装步骤

​根据大数据调查表明从网络下载视频&#xff1a;用Folx从网页上下载视频&#xff0c;能够设置下载视频的格式&#xff0c;你也能够下载年龄限制和私人视频&#xff0c;当你不需要视频&#xff0c;只想要一个音轨的时候&#xff0c;Folx是非常有用的!这个互联网下载器所有的视频…

Multisim软件仿真之频谱分析仪

网络上有很多Multisim文件&#xff0c;有些是不能复现的&#xff0c;比如频谱仪&#xff0c;按照下面链接去操作&#xff0c;怎么也测试不出来波形&#xff0c;multisim频谱仪使用_multisim输入输出端口-CSDN博客。 原因分析&#xff1a; 1、博主设置参数未讲全&#xff0c;按…

第29讲:Ceph集群使用RBD块存储设备与K8S的PV集成

文章目录 1.Ceph集群使用RBD块存储与K8S集成简介2.Ceph集群RBD块存储与K8S PV存储卷集成2.1.创建K8S集群PV使用的块存储2.2.创建K8S集群访问RBD块存储设备的认证用户2.3.将认证用户的Key存储在K8S Secret资源中2.4.在K8S集群的所有节点中安装Ceph命令2.5.创建PV及PVC资源使用RB…

Python酷库之旅-比翼双飞情侣库(16)

目录 一、xlwt库的由来 1、背景和需求 2、项目启动 3、功能特点 4、版本兼容性 5、与其他库的关系 6、示例和应用 7、发展历史 二、xlwt库优缺点 1、优点 1-1、简单易用 1-2、功能丰富 1-3、兼容旧版Excel 1-4、社区支持 1-5、稳定性 2、缺点 2-1、不支持.xls…

AI 生成文本工具推荐(AI 对话/AI 聊天机器人/AI 写作)

① boardmix AI boardmix AI&#xff0c;是一个在线的智能 AI 对话 App&#xff0c;打开浏览器即可在线使用&#xff0c;支持 AI 多轮连续对话&#xff0c;提供 AI 角色切换、AI 多语言翻译、一键唤出、可视化表达及多人协作功能。 boardmix AI 预置了多个不同的 AI 角色&…

Red Hat Ansible Automation Platform架构

目录 示例架构&#xff1a;一、Ansible Automation Platform 实现流程详解1. 自动化控制器 (Automation Controller)2. 自动化网格 (Automation Mesh)3. 私有自动化中心 (Private Automation Hub)4. Event-Driven Ansible 控制器5. 数据存储 (PostgreSQL 数据库) 二、实现流程1…

计算机SCI期刊,中科院2区TOP,收稿范围广泛!

一、期刊名称 IEEE Transactions on Automation Science and Engineering 二、期刊简介概况 期刊类型&#xff1a;SCI 学科领域&#xff1a;计算机科学 影响因子&#xff1a;5.6 中科院分区&#xff1a;2区top 三、期刊征稿范围 IEEE Transactions on Automation Science…

TCP/IP协议,三次握手,四次挥手,常用的协议

IP - 网际协议 IP 负责计算机之间的通信。 IP 负责在因特网上发送和接收数据包。 HTTP - 超文本传输协议 HTTP 负责 web 服务器与 web 浏览器之间的通信。 HTTP 用于从 web 客户端&#xff08;浏览器&#xff09;向 web 服务器发送请求&#xff0c;并从 web 服务器向 web …

汇编:masm伪指令

条件判断语句 32位汇编语言中&#xff0c;伪指令&#xff08;如.IF、.ELSEIF和.ENDIF&#xff09;是用来进行条件判断的高层次语法结构&#xff0c;这些伪指令最终会被汇编器转换成相应的低层次机器码。伪指令可以简化条件分支的实现&#xff0c;类似于高级编程语言中的if-els…

vue 使用 ztree 超大量数据,前端树形结构展示

ztree 是一个很经典的基于jquey开发的树结构编辑展示UI组件库。 创建一个文件 ztree.vue&#xff0c;代码如下&#xff1a; <template><div><div class"ztree vue-giant-tree" :id"ztreeId"></div><div class"treeBox&q…

【自动驾驶技术】自动驾驶汽车AI芯片汇总——TESLA篇(FSD介绍)

0. 前言 按照国际惯例&#xff0c;首先声明&#xff1a;本文只是我自己学习的理解&#xff0c;虽然参考了他人的宝贵见解及成果&#xff0c;但是内容可能存在不准确的地方。如果发现文中错误&#xff0c;希望批评指正&#xff0c;共同进步。 本篇文章是这个自动驾驶汽车AI芯片系…