docker-compose部署kafka

docker-compose.yml配置

version: "3"
services:
  kafka:
    image: 'bitnami/kafka:latest'
    ports:
      - '7050:7050'
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:7050,CONTROLLER://:7051
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://183.56.203.157:7050
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@0.0.0.0:7051
      - ALLOW_PLAINTEXT_LISTENER=yes

kafka UI界面

docker run -d --name kafka-map -p 8049:8080 -e DEFAULT_USERNAME=admin -e DEFAULT_PASSWORD=admin dushixiang/kafka-map:latest

docker run -p 8080:8080 -e KAFKA_BROKERS=host.docker.internal:9092 docker.redpanda.com/vectorized/console:master-173596f

UI界面总览

https://towardsdatascience.com/overview-of-ui-tools-for-monitoring-and-management-of-apache-kafka-clusters-8c383f897e80

kafka学习

生产者
import org.apache.kafka.clients.producer.Callback
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import org.junit.Test
import java.util.*

/**
 * @Description :
 * @Author  xiaomh
 * @date  2022/8/5 15:58
 */
class CustomProducer {

    //异步发送
    @Test
    fun customProducer() {
        //配置
        val properties = Properties()
        //链接kafka
        properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"
        //指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
        properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
        //创建kafka生产者对象
        val kafkaProducer = KafkaProducer<String, String>(properties)
        //发送数据
        for (i in 0 until 5) {
            //黏性发送,达到设置的数据最大值/时间后,切换分区(不会是当前分区)
            kafkaProducer.send(ProducerRecord("xiao1", "customProducer,count::$i"))
        }
        //关闭资源
        kafkaProducer.close()
    }

    //同步发送
    @Test
    fun customProducerSync() {
        //配置
        val properties = Properties()
        //链接kafka
        properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"
        //指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
        properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
        //创建kafka生产者对象
        val kafkaProducer = KafkaProducer<String, String>(properties)
        //发送数据
        for (i in 0 until 5) {
            //黏性发送,达到设置的数据最大值/时间后,切换分区(不会是当前分区)
            kafkaProducer.send(ProducerRecord("xiao1", "customProducerSync,count::$i")).get()
        }
        //关闭资源
        kafkaProducer.close()
    }

    //回调异步发送
    @Test
    fun customProducerCallback() {
        //配置
        val properties = Properties()
        //链接kafka
        properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"
        //指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
        properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
        //创建kafka生产者对象
        val kafkaProducer = KafkaProducer<String, String>(properties)
        //发送数据
        for (i in 0 until 500) {
            //黏性发送,达到设置的数据最大值/时间后,切换分区(不会是当前分区)
            kafkaProducer.send(ProducerRecord("xiao1", "customProducerCallback,count::$i"), Callback
            { metadata, exception ->
                if (exception == null) {
                    println("主题:${metadata.topic()},分区:${metadata.partition()}")
                }
            })
            //测试分区策略
            Thread.sleep(1)
        }
        //关闭资源
        kafkaProducer.close()
    }

    //回调异步发送+使用分区
    @Test
    fun customProducerCallbackPartitions1() {
        //配置
        val properties = Properties()
        //链接kafka
        properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"
        //指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
        properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
        //创建kafka生产者对象
        val kafkaProducer = KafkaProducer<String, String>(properties)
        //发送数据
        for (i in 0 until 5) {
            //1.没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值
            //2.既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器)
            //key可以作为producer数据名,让consumer通过key找到
            kafkaProducer.send(ProducerRecord("xiao1", 1, "", "customProducerCallbackPartitions,count::$i"), Callback
            { metadata, exception ->
                if (exception == null) {
                    println("主题:${metadata.topic()},分区:${metadata.partition()}")
                }
            })

        }
        //关闭资源
        kafkaProducer.close()
    }

    //回调异步发送+自定义分区
    @Test
    fun customProducerCallbackPartitions2() {
        //配置
        val properties = Properties()
        //链接kafka,集群链接使用"183.56.203.157:7050,183.56.203.157:7051"
        properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"
        //指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
        properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
        //关联自定义分区器
        properties[ProducerConfig.PARTITIONER_CLASS_CONFIG] =
            "com.umh.medicalbookingplatform.b2bapi.config.MyPartitioner"
        //创建kafka生产者对象
        val kafkaProducer = KafkaProducer<String, String>(properties)
        //发送数据
        for (i in 0 until 50) {
            //1.没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值
            //2.既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器)
            //key可以作为producer数据名,让consumer通过key找到
            kafkaProducer.send(ProducerRecord("xiao1", "felix is strong,count::$i"), Callback
            { metadata, exception ->
                if (exception == null) {
                    println("主题:${metadata.topic()},分区:${metadata.partition()}")
                }
            })

        }
        //关闭资源
        kafkaProducer.close()
    }

    //自定义配置缓冲区、批次、等待时间、压缩
    @Test
    fun customProducerParameters() {
        //配置
        val properties = Properties()
        properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"
        //指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
        properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name

        //缓冲区大小。默认32,64=33554432x2
        properties[ProducerConfig.BUFFER_MEMORY_CONFIG] = 33554432
        //批次大小。默认16k
        properties[ProducerConfig.BATCH_SIZE_CONFIG] = 16384
        //等待时间。默认0
        properties[ProducerConfig.LINGER_MS_CONFIG] = 1
        //压缩.压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
        properties[ProducerConfig.COMPRESSION_TYPE_CONFIG] = "snappy"
        //创建kafka生产者对象
        val kafkaProducer = KafkaProducer<String, String>(properties)

        for (i in 0 until 10) {
            //1.没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值
            //2.既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器)
            //key可以作为producer数据名,让consumer通过key找到
            kafkaProducer.send(ProducerRecord("xiao1", "customProducerParameters::$i"), Callback
            { metadata, exception ->
                if (exception == null) {
                    println("主题:${metadata.topic()},分区:${metadata.partition()}")
                }
            })

        }
        //关闭资源
        kafkaProducer.close()
    }

    //ack、重试次数配置
    @Test
    fun customProducerAck() {
        //配置
        val properties = Properties()
        properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"
        //指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
        properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name

        //ack
        properties[ProducerConfig.ACKS_CONFIG] = "1"
        //重试次数
        properties[ProducerConfig.RETRIES_CONFIG] = 30

        //创建kafka生产者对象
        val kafkaProducer = KafkaProducer<String, String>(properties)

        for (i in 0 until 10) {
            //1.没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值
            //2.既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器)
            //key可以作为producer数据名,让consumer通过key找到
            kafkaProducer.send(ProducerRecord("xiao1", "customProducerAck::$i"), Callback
            { metadata, exception ->
                if (exception == null) {
                    println("主题:${metadata.topic()},分区:${metadata.partition()}")
                }
            })

        }
        //关闭资源
        kafkaProducer.close()
    }

    //事物
    @Test
    fun customProducerTransaction() {
        //配置
        val properties = Properties()
        properties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"
        //指定对应key和value的序列化类型(二选一)
//        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "org.apache.kafka.common.serialization.StringSerializer"
        properties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
        properties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
        //指定事务id,一定要指定!!
        properties[ProducerConfig.TRANSACTIONAL_ID_CONFIG] = UUID.randomUUID().toString()
        //创建kafka生产者对象
        val kafkaProducer = KafkaProducer<String, String>(properties)
        //开启事务
        kafkaProducer.initTransactions()
        kafkaProducer.beginTransaction()
        try {
            for (i in 0 until 10) {
                //1.没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值
                //2.既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器)
                //key可以作为producer数据名,让consumer通过key找到
                kafkaProducer.send(ProducerRecord("xiao1", "customProducerTransaction::$i"), Callback
                { metadata, exception ->
                    if (exception == null) {
                        println("主题:${metadata.topic()},分区:${metadata.partition()}")
                    }
                })
            }
//            val test: Int = 1 / 0
            kafkaProducer.commitTransaction()
        } catch (e: Exception) {
            kafkaProducer.abortTransaction()
        } finally {
            //关闭资源

            kafkaProducer.close()
        }
    }

}

消费者

1、一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个 partition的数据。

2、Kafka有四种主流的分区分配策略: Range、RoundRobin、Sticky、CooperativeSticky。 可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky。Kafka可以同时使用 多个分区分配策略。

3、每个消费者都会和coordinator保持心跳(默认3s),一旦超时 (session.timeout.ms=45s),该消费者会被移除,并触发再平衡; 或者消费者处理消息的过长(max.poll.interval.ms5分钟),也会触发再 平衡

package com.umh.medicalbookingplatform.api

import com.alibaba.fastjson.parser.ParserConfig
import com.fasterxml.jackson.databind.MapperFeature
import com.umh.medicalbookingplatform.core.audit.SpringSecurityAuditorAware
import com.umh.medicalbookingplatform.core.config.CoreConfiguration
import com.umh.medicalbookingplatform.core.jsonview.JsonViews
import com.umh.medicalbookingplatform.core.properties.ApplicationProperties
import com.umh.medicalbookingplatform.core.utils.ApplicationJsonObjectMapper
import org.jboss.resteasy.client.jaxrs.ResteasyClientBuilder
import org.keycloak.OAuth2Constants
import org.keycloak.admin.client.Keycloak
import org.keycloak.admin.client.KeycloakBuilder
import io.swagger.v3.oas.models.Components
import io.swagger.v3.oas.models.OpenAPI
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.boot.web.servlet.ServletComponentScan
import org.springframework.cache.annotation.EnableCaching
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Import
import org.springframework.data.domain.AuditorAware
import org.springframework.data.jpa.repository.config.EnableJpaAuditing
import org.springframework.http.MediaType
import org.springframework.http.converter.HttpMessageConverter
import org.springframework.http.converter.ResourceHttpMessageConverter
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter
import org.springframework.scheduling.annotation.EnableScheduling
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer
import java.security.Security
import java.util.*
import io.swagger.v3.oas.models.info.Info
import io.swagger.v3.oas.models.info.License
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.keycloak.adapters.KeycloakConfigResolver
import org.keycloak.adapters.springboot.KeycloakSpringBootConfigResolver
import org.keycloak.adapters.springboot.KeycloakSpringBootProperties
import org.springframework.http.converter.StringHttpMessageConverter
import java.time.Duration
import java.util.concurrent.TimeUnit


@EnableJpaAuditing
@EnableCaching
@EnableScheduling
@SpringBootApplication
@Import(CoreConfiguration::class)
@ServletComponentScan("com.umh.medicalbookingplatform")
open class ApiApplication : WebMvcConfigurer {

    @Autowired
    private lateinit var appProperties: ApplicationProperties

    @Autowired
    private lateinit var keycloakSpringBootProperties: KeycloakSpringBootProperties

    @Bean
    fun keycloakConfigResolver(): KeycloakConfigResolver {
        return KeycloakSpringBootConfigResolver()
    }

    @Bean
    fun fastJson(){
        ParserConfig.getGlobalInstance().isAutoTypeSupport = true
    }

    @Bean
    fun customConsumer() {
        //配置
        val properties = Properties()
        //连接
        properties[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"
        //反序列化(注意写法:生产者是序列化,消费者是反序列化)
        properties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
        properties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
        //配置消费者组id(就算消费者组只有一个消费者也需要)
        //当消费者组ID相同时,表示他们在同一个消费者组
        //当有三个分区,而消费者组里又有三个消费者时,消费者会各自自动选取一个分区进行消费
        properties[ConsumerConfig.GROUP_ID_CONFIG] = "test"
        //1.创建一个消费者
        val kafkaConsumer = KafkaConsumer<String, String>(properties)
        //2.定义主题 xiao1
        val topics = mutableListOf<String>()
        topics.add("xiao1")
        kafkaConsumer.subscribe(topics)
        //3.消费数据
        while (true) {
            val consumerRecord: ConsumerRecords<String, String> = kafkaConsumer.poll(Duration.ofSeconds(1))
            for (msg in consumerRecord) {
                println("consumer,msg:::$msg")
            }
        }
    }

//    @Bean
    fun customConsumerPartition() {
        //配置
        val properties = Properties()
        //连接
        properties[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"
        //反序列化(注意写法:生产者是序列化,消费者是反序列化)
        properties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
        properties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
        //配置消费者组id(就算消费者组只有一个消费者也需要)
        //当消费者组ID相同时,表示他们在同一个消费者组
        properties[ConsumerConfig.GROUP_ID_CONFIG] = UUID.randomUUID().toString()
        //1.创建一个消费者
        val kafkaConsumer = KafkaConsumer<String, String>(properties)
        //2.定义主题对应的分区
        val topicPartition = mutableListOf<TopicPartition>()
        topicPartition.add(TopicPartition("xiao1", 1))
        kafkaConsumer.assign(topicPartition)
        //3.消费数据
        while (true) {
            val consumerRecord: ConsumerRecords<String, String> = kafkaConsumer.poll(Duration.ofSeconds(1))
            for (msg in consumerRecord) {
                println("msg:::$msg")
            }
        }
    }

    @Bean(name = ["keycloakGlobalCmsApi"])
    fun keycloakGlobalCmsApiInstance(): Keycloak {
        return KeycloakBuilder.builder()
            .serverUrl(appProperties.keycloakAuthServerUrl)//https://keycloak.umhgp.com/auth
            .realm(appProperties.keycloakGlobalCmsRealm)//global_cms
            .clientId(appProperties.keycloakGlobalCmsClient)//global-cms
            .username(appProperties.keycloakApiUsername)//medical-booking-platform-system-uat
            .password(appProperties.keycloakApiPassword)//Kas7aAnC76eGVHv5
            .grantType(OAuth2Constants.PASSWORD)
            .resteasyClient(
                ResteasyClientBuilder()
                    .connectTimeout(10, TimeUnit.SECONDS)
                    .readTimeout(10, TimeUnit.SECONDS)
                    .connectionPoolSize(100).build()
            ).build()
    }

    @Bean(name = ["keycloakGlobalProfileApi"])
    fun keycloakGlobalProfileApiInstance(): Keycloak {
        return KeycloakBuilder.builder()
            .serverUrl(appProperties.keycloakAuthServerUrl)
            .realm(appProperties.keycloakGlobalProfileRealm)
            .clientId(appProperties.keycloakGlobalProfileClient)
            .username(appProperties.keycloakApiUsername)
            .password(appProperties.keycloakApiPassword)
            .grantType(OAuth2Constants.PASSWORD)
            .resteasyClient(
                ResteasyClientBuilder()
                    .connectTimeout(10, TimeUnit.SECONDS)
                    .readTimeout(10, TimeUnit.SECONDS)
                    .connectionPoolSize(100).build()
            ).build()
    }

    @Bean(name = ["keycloakBookingSystemApi"])
    fun keycloakBookingSystemApiInstance(): Keycloak {
        return KeycloakBuilder.builder()
            .serverUrl(appProperties.keycloakAuthServerUrl)
            .realm(appProperties.keycloakBookingSystemRealm)
            .clientId(appProperties.keycloakBookingSystemClient)
            .username(appProperties.keycloakApiUsername)
            .password(appProperties.keycloakApiPassword)
            .grantType(OAuth2Constants.PASSWORD)
            .resteasyClient(
                ResteasyClientBuilder()
                    .connectTimeout(10, TimeUnit.SECONDS)
                    .readTimeout(10, TimeUnit.SECONDS)
                    .connectionPoolSize(100).build()
            ).build()
    }

    @Bean(name = ["keycloakUmhBookingSystemApi"])
    fun keycloakBookingSystemUmhApiInstance(): Keycloak {
        return KeycloakBuilder.builder()
            .serverUrl(appProperties.keycloakAuthServerUrl)
            .realm(appProperties.keycloakUmhBookingSystemRealm)
            .clientId(appProperties.keycloakUmhBookingSystemClient)
            .username(appProperties.keycloakApiUsername)
            .password(appProperties.keycloakApiPassword)
            .grantType(OAuth2Constants.PASSWORD)
            .resteasyClient(
                ResteasyClientBuilder()
                    .connectTimeout(10, TimeUnit.SECONDS)
                    .readTimeout(10, TimeUnit.SECONDS)
                    .connectionPoolSize(100).build()
            ).build()
    }

    @Bean
    internal fun auditorProvider(): AuditorAware<UUID> {
        return SpringSecurityAuditorAware()
    }
    @Bean
    fun customOpenAPI(): OpenAPI? {
        return OpenAPI()
            .components(Components())
            .info(
                Info().title("medical-booking-platform").version("1.5.8")
                    .license(License().name("Apache 2.0").url("http://springdoc.org"))
            )
    }


    override fun configureMessageConverters(converters: MutableList<HttpMessageConverter<*>>) {
//        ActuatorMediaTypes()
        val supportedMediaTypes = ArrayList<MediaType>()
        supportedMediaTypes.add(MediaType.APPLICATION_JSON)
        supportedMediaTypes.add(MediaType.valueOf("application/vnd.spring-boot.actuator.v3+json"))
        supportedMediaTypes.add(MediaType.TEXT_PLAIN)
        val converter = MappingJackson2HttpMessageConverter()
        val objectMapper = ApplicationJsonObjectMapper()
        objectMapper.setConfig(objectMapper.serializationConfig.withView(JsonViews.Admin::class.java))
        objectMapper.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, true)

        converter.objectMapper = objectMapper
        converter.setPrettyPrint(true)
        converter.supportedMediaTypes = supportedMediaTypes

        converters.add(0, StringHttpMessageConverter())
        converters.add(1, converter)

        converters.add(ResourceHttpMessageConverter())
    }

}

fun main(args: Array<String>) {
    Security.setProperty("crypto.policy", "unlimited")

    runApplication<ApiApplication>(*args)
}

range(范围)

Kafka 默认的分区分配策略就是 Range + CooperativeSticky,所以不需要修改策 略。

消费者分区操作:7分区2个消费者时

消费者1:消费分区0123

消费者2:消费分区456


在同一个消费者组,三消费者的情况下,如果其中一个宕机,45秒后会把消费者0需要处理的数据整个搬到消费者1或者消费者2.

结果:Consumer1=01234 或者 Consumer2=01256

随后如果再传输数据,消费者组会根据当前的消费者重新组织分配

Consumer0宕机45秒后再次传数据结果:Consumer1=0123 Consumer2=456

RoundRobin(轮询)

RoundRobin 针对集群中所有Topic而言。 RoundRobin 轮询分区策略,是把所有的 partition 和所有的 consumer 都列出来,然后按照 hashcode 进行排序,最后 通过轮询算法来分配 partition 给到各个消费者。

策略分配的修改

    @Bean
    fun customConsumer() {
        //配置
        val properties = Properties()
        //连接
        properties[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = "183.56.218.28:8000"
        //反序列化(注意写法:生产者是序列化,消费者是反序列化)
        properties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
        properties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java.name
        //配置消费者组id(就算消费者组只有一个消费者也需要)
        //当消费者组ID相同时,表示他们在同一个消费者组
        //当有三个分区,而消费者组里又有三个消费者时,消费者会各自自动选取一个分区进行消费
        properties[ConsumerConfig.GROUP_ID_CONFIG] = "test"
        //设置分区分配策略
        properties[ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG] = "org.apache.kafka.clients.consumer.RoundRobinAssignor"
        //1.创建一个消费者
        val kafkaConsumer = KafkaConsumer<String, String>(properties)
        //2.定义主题 xiao1
        val topics = mutableListOf<String>()
        topics.add("xiao1")
        kafkaConsumer.subscribe(topics)
        //3.消费数据
        while (true) {
            val consumerRecord: ConsumerRecords<String, String> = kafkaConsumer.poll(Duration.ofSeconds(1))
            for (msg in consumerRecord) {
                println("consumer,msg:::$msg")
            }
        }
    }

注意:06为一组给到一个消费者,3为一组给到另外一个消费者。45秒后重新发送数据,consumer2:0246,consumer3:135

Sticky (黏性)

(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。

1 号消费者:消费到 2、5、3 号分区数据。

2 号消费者:消费到 4、6 号分区数据。

0 号消费者的任务会按照粘性规则,尽可能均衡的随机分成 0 和 1 号分区数据,分别 由 1 号消费者或者 2 号消费者消费。

说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需 要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

(2)再次重新发送消息观看结果(45s 以后)。

1 号消费者:消费到 2、3、5 号分区数据。

2 号消费者:消费到 0、1、4、6 号分区数据。

说明:消费者 0 已经被踢出消费者组,所以重新按照粘性方式分配。

随机+均匀

宕机后分配的消费者和45秒后分配消费者一样

宕机(3消费者变2消费者):1403,235

45秒后2消费者:1403,235

本文转自 https://blog.csdn.net/weixin_52925162/article/details/126280062?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522170100111416800225544545%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=170100111416800225544545&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2allfirst_rank_ecpm_v1~rank_v31_ecpm-8-126280062-null-null.142v96pc_search_result_base9&utm_term=keycloak%20docker-compose&spm=1018.2226.3001.4187,如有侵权,请联系删除。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/270927.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

面向对象设计与分析40讲(15)简单工厂方法模式

文章目录 定义示例优缺点定义 简单工厂模式是一种创建型模式,用于根据客户端的需求创建对象实例,所谓的需求反映到编程语言里就是传入的参数。 简单工厂模式包括三个主要部分: 工厂类(Simple Factory):这是整个模式的核心。它负责根据客户端的请求来创建并返回相应的对…

【教程】从gitee或者github,下载单个文件或文件夹命令

1.打开git 2.初始化 git init 3.设置允许下载子目录 &#xff08;不需要修改任何&#xff0c;只要原样复制&#xff0c;需要按照个人状况修改的话我会标注&#xff09; git config core.sparseCheckout true 4. 选择要下载的单个文件夹的路径 这里单引号内部需要修改&…

Shell 脚本基础

Shell脚本 脚本以#!/bin/bash开头 执行方式 直接使用文件名执行&#xff1a;文件需要执行权限 以bash xxx.sh来执行, 本质上是bash解析器去执行, 文件作为一个输入, 因此可以不需要执行权限 变量 系统变量 自定义变量 定义变量 # 定义一个变量username, 注意不能有多余…

一体机定制_工控触控一体机安卓主板方案

工控一体机是一种集成化的硬件方案&#xff0c;采用了联发科MT8768八核芯片和12nm制程工艺。该芯片拥有2.0GHz的主频和IMG PowerVR GE8320图形处理GPU&#xff0c;具备强大的视频处理能力&#xff0c;并且兼容大部分的视频格式和解码能力。工控一体机搭载了Android 9.0操作系统…

学Java的第二天

一、常量 1.值不可以变化的量。 2. 分类&#xff1a; 字符串常量 用双引号括起来的多个字符&#xff0c;可以包含 0、1 或多个&#xff0c;例如 "a" 、 "abc" 、 " 中国 " 整数常量&#xff0c;例如&#xff1a; -10 、 0 、 88 小数常量&…

浅谈Java反射中的getFields()方法和getDeclaredFields ()方法

目录 1. 概念2. getFields()方法2. getDeclaredFields()方法4. 总结 1. 概念 反射是Java中一种强大的机制&#xff0c;允许在运行时获取、检查和操作类、方法、字段等信息&#xff0c;而不需要在编译时知道这些信息。 其中字段&#xff08;Field&#xff09;在Java中是类中用…

【精选】vulnhub CTF5 NanoCMS漏洞 (青铜门笔记)

&#x1f36c; 博主介绍&#x1f468;‍&#x1f393; 博主介绍&#xff1a;大家好&#xff0c;我是 hacker-routing &#xff0c;很高兴认识大家~ ✨主攻领域&#xff1a;【渗透领域】【应急响应】 【python】 【VulnHub靶场复现】【面试分析】 &#x1f389;点赞➕评论➕收藏…

微信小程序使用canvas制作海报并保存到本地相册(超级详细)

案例图 分析案例图都有哪些元素 1.渐变背景 2.圆形头像 3.文字 4.文字超出换行 5.图片居中 6.文字居中 7.单位适配 8.弹窗保存图片。因为一个个绘制图形太麻烦所以这里都采用了方法封装。 canvas api介绍 最后有全部代码&#xff0c;复制即用。 data数据 data() {return {myO…

并发程序设计--D1进程的创建和回收

进程和程序内容区别 进程包含的内容&#xff1a; BSS段&#xff1a;存放程序中未初始化的全局变量 数据段&#xff1a;已初始化的全局变量 代码段&#xff1a;程序执行代码 堆&#xff08;heap&#xff09;&#xff1a;malloc等函数分配内存 栈(stack)&#xff1a;局部变量…

升压斩波电路的simulink建模与仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 一、升压斩波电路概述 二、升压斩波电路的基本工作原理 5.完整工程文件 1.课题概述 升压斩波电路的simulink建模与仿真&#xff0c;通过双闭环结构实现电池&#xff0c;点击的控制。 2.系统仿真结果 …

复试情报准备

英语自我介绍&#xff0c;介绍完老师会根据你的回答用英语问你问题&#xff0c;比如介绍一下你的本科学校&#xff0c;或者家乡什么的。计网过一遍&#xff0c;会问两道题。接下来是重点&#xff0c;我当时是根据我成绩单&#xff0c;问了我本科学过的科目&#xff0c;比如pyth…

PPT中加入页码

PPT中加入页码 文章目录 简单版本样式更改 简单版本 PPT中插入页码&#xff0c;基础的就是在“插入”选项卡中单机“幻灯片编号”即可 样式更改 然而&#xff0c;就像我们做幻灯片不满足于白底黑字一样&#xff0c;页码也总不能是默认的样式。 比如&#xff0c;在页码下面…

Spring Cloud Gateway 网关整合 Knife4j 4.3 实现微服务接口文档聚合

&#x1f680; 作者主页&#xff1a; 有来技术 &#x1f525; 开源项目&#xff1a; youlai-mall &#x1f343; vue3-element-admin &#x1f343; youlai-boot &#x1f33a; 仓库主页&#xff1a; Gitee &#x1f4ab; Github &#x1f4ab; GitCode &#x1f496; 欢迎点赞…

QT小技巧 - 使用QMovie进行gif切帧

简介 使用QMovie 将 gif 进行切帧&#xff0c; magick 进行合并代码 QString gifPath "E:\\workspace\\qt\\gif2imgs\\203526qre64haq3ccoobqi.gif"; // 你的图片QMovie movie(gifPath); movie.setCacheMode(QMovie::CacheNone);qDebug() << movie.frameCou…

31. Ajax

简介 AJAX 是 Asynchronous JavaScript And XML 的简称。直译为&#xff0c;异步的JS和XML。AJAX的实际意义是&#xff0c;不发生页面跳转、异步载入内容并改写页面内容的技术。AJAX也可以简单的理解为通过JS向服务器发送请求。 AJAX这门技术很早就被发明&#xff0c;但是直到…

基于AR+地图导航的景区智慧导览设计

随着科技的飞速发展&#xff0c;智慧旅游已经成为现代旅游业的一个重要趋势。在这个背景下&#xff0c;景区智慧导览作为智慧旅游的核心组成部分&#xff0c;正逐渐受到越来越多游客的青睐。本文将深入探讨地图导航软件在景区智慧导览中的应用&#xff0c;并分析其为游客和景区…

C++中的存储类及其实例

文章目录 0. 语法1. 自动存储类自动存储类对象的属性自动存储类的例子 2. 外部存储类extern存储类对象的属性extern存储类的例子 3. 静态存储类静态存储类的属性静态存储类的例子 4. 寄存器存储类寄存器存储类对象的属性寄存器存储类例子 5. 可变&#xff08;mutable&#xff0…

iMazing2024免费版iOS移动设备管理软件

以自己的方式管理iPhone&#xff0c;让备受信赖的软件为您传输和保存音乐、消息、文件和数据。安全备份任何 iPhone、iPad 或 iPod touch。iMazing 功能强大、易于使用&#xff0c;称得上是 Mac 和 PC 上最好的 iOS 设备管理器。 正在为iTunes繁琐的操作发愁&#xff1f;设备数…

86% 的网络攻击是通过加密渠道进行

自 2022 年以来&#xff0c;HTTPS 威胁增长了 24%&#xff0c;凸显了针对加密通道的网络犯罪策略的复杂性。 制造业连续第二年成为最常受到攻击的行业&#xff0c;教育和政府组织的攻击同比增幅最高。此外&#xff0c;包括恶意 Web 内容和恶意软件负载在内的恶意软件继续主导其…

数字化转型是什么?有哪些应用?_光点科技

数字化转型是什么&#xff1f; 数字化转型是指企业或组织通过采用数字技术来改变其业务模式和运营方式&#xff0c;以适应新兴市场趋势、提高效率、增强客户体验和增加竞争优势的过程。它不仅涉及技术的变革&#xff0c;还包括企业文化、组织结构和业务流程的全面调整。数字化…