springboot kafka多数据源,通过配置动态加载发送者和消费者

前言

最近做项目,需要支持kafka多数据源,实际上我们也可以通过代码固定写死多套kafka集群逻辑,但是如果需要不修改代码扩展呢,因为kafka本身不处理额外逻辑,只是起到削峰,和数据的传递,那么就需要对架构做一定的设计了。

准备test

kafka本身非常容易上手,如果我们需要单元测试,引入jar依赖,JDK使用1.8,当然也可以使用JDK17

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.7.17</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <version>2.7.17</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.9.13</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <version>2.9.13</version>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.testcontainers/kafka -->
        <dependency>
            <groupId>org.testcontainers</groupId>
            <artifactId>kafka</artifactId>
            <version>1.20.1</version>
            <scope>test</scope>
        </dependency>

    </dependencies>

修改发送者和接收者

@Component
public class KafkaProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(String topic, String payload) {
        LOGGER.info("sending payload='{}' to topic='{}'", payload, topic);
        kafkaTemplate.send(topic, payload);
    }
}

@Component
public class KafkaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

    private String payload;

    @KafkaListener(topics = "${test.topic}")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {
        LOGGER.info("----------------received payload='{}'", consumerRecord.toString());
        payload = consumerRecord.toString();
    }
    public String getPayload() {
        return payload;
    }

    public void setPayload(String payload) {
        this.payload = payload;
    }
}

然后写main方法,随意写一个即可,配置入戏

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest
      group-id: mytest
test:
  topic: embedded-test-topic

写一个单元测试

@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class DemoMainTest {

    @Autowired
    private KafkaConsumer consumer;

    @Autowired
    private KafkaProducer producer;

    @Value("${test.topic}")
    private String topic;

    @Test
    void embedKafka() throws InterruptedException {
        String data = "Sending with our own simple KafkaProducer";

        producer.send(topic, data);
        Thread.sleep(3000);
        assertThat(consumer.getPayload(), containsString(data));
        Thread.sleep(10000);
    }
}

通过

@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })

直接模拟一个kafka,里面有一些注解参数,可以设置broker的 数量端口,zk的端口,topic和partition数量等

实际上是通过embed zk和kafka来mock了一个kafka server

单元测试运行成功

思路

有了kafka单元测试后,根据springboot map可以接收多套配置的方式不就实现了kafka的多数据源的能力,貌似非常简单;但是如果需要不用修改代码,消费端怎么办,发送者可以手动创建,消费端是注解方式,topic等信息在注解参数中,注解参数值却是常量,代码写死的,那么我们就需要:

  1. 不让Springboot自动扫描,根据配置手动扫描注册bean
  2. 字节码生成bean,就可以根据参数

这里没考虑把消费端和发送者的额外处理逻辑写在这里的做法,统一处理kafka,类似kafka网关,因为kafka一般不会仅一套,且不会仅有一个topic,需要分发处理,比如slb,feign等。

kafka消费者的原理 

其实kafka发送者和消费者也是类似逻辑,但是spring-kafka通过注解方式实现消费者,如果我们使用原生kafka的kafkaconsumer,那么只需要通过Map接收参数,然后自己实现消费逻辑就行,但是spring-kafka毕竟做了很多公共没必要的逻辑,拉取消费的一系列参数,线程池管理等处理措施。看看Spring-kafka的消费者初始化原理,

BeanPostProcessor的kafka实现

org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor

看前置处理

什么都没做,所以,所有逻辑都在后置处理

public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
		if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
			Class<?> targetClass = AopUtils.getTargetClass(bean);
            //找到注解,消费注解KafkaListener打在类上,一般不用这种方式
			Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
            //类上KafkaListener注解的标志
			final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
			final List<Method> multiMethods = new ArrayList<>();
            //找到消费方法,去每个方法上找KafkaListener注解
			Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
					(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
						Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
						return (!listenerMethods.isEmpty() ? listenerMethods : null);
					});
			if (hasClassLevelListeners) {
                //类上KafkaListener注解的时候,通过另外的注解KafkaHandler的方式,找到消费方法
				Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
						(ReflectionUtils.MethodFilter) method ->
								AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
				multiMethods.addAll(methodsWithHandler);
			}
            //实际上大部分类是没有kafka消费注解的,效率并不高,但是因为日志是trace,所以日志一般默认看不见
            //注解KafkaListener打在方法上的时候
			if (annotatedMethods.isEmpty() && !hasClassLevelListeners) {
				this.nonAnnotatedClasses.add(bean.getClass());
				this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
			}
			else {
				// Non-empty set of methods
				for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
					Method method = entry.getKey();
					for (KafkaListener listener : entry.getValue()) {
                        //核心逻辑
						processKafkaListener(listener, method, bean, beanName);
					}
				}
				this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
							+ beanName + "': " + annotatedMethods);
			}
            //注解KafkaListener打在类上,实际上处理逻辑跟KafkaListener打在方法上差不多
			if (hasClassLevelListeners) {
				processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
			}
		}
		return bean;
	}

如果是注解打在类上,如下

 

本文中的示例的@KafkaListener打在方法上,所以分析

processKafkaListener 

其实原理都一样,spring-kafka不会写2份一样逻辑,只是读取处理的参数略有不同

protected synchronized void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean,
			String beanName) {
        //检查代理
		Method methodToUse = checkProxy(method, bean);
        //终端设计思想,Spring很多地方都这样设计,尤其是swagger
		MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
		endpoint.setMethod(methodToUse);
        //bean的名称,这里需要定制全局唯一,否则多个listener会冲突
		String beanRef = kafkaListener.beanRef();
		this.listenerScope.addListener(beanRef, bean);
		String[] topics = resolveTopics(kafkaListener);
		TopicPartitionOffset[] tps = resolveTopicPartitions(kafkaListener);
		if (!processMainAndRetryListeners(kafkaListener, bean, beanName, methodToUse, endpoint, topics, tps)) {
            //核心逻辑
			processListener(endpoint, kafkaListener, bean, beanName, topics, tps);
		}
		this.listenerScope.removeListener(beanRef);
	}

继续

processListener
protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
								Object bean, String beanName, String[] topics, TopicPartitionOffset[] tps) {
        //MethodKafkaListenerEndpoint赋值了,这个很关键
		processKafkaListenerAnnotation(endpoint, kafkaListener, bean, topics, tps);
        //容器工厂
		String containerFactory = resolve(kafkaListener.containerFactory());
		KafkaListenerContainerFactory<?> listenerContainerFactory = resolveContainerFactory(kafkaListener,
				containerFactory, beanName);
        //注册终端,最终生效
		this.registrar.registerEndpoint(endpoint, listenerContainerFactory);
	}

processKafkaListenerAnnotation

private void processKafkaListenerAnnotation(MethodKafkaListenerEndpoint<?, ?> endpoint,
			KafkaListener kafkaListener, Object bean, String[] topics, TopicPartitionOffset[] tps) {

		endpoint.setBean(bean);
		endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
		endpoint.setId(getEndpointId(kafkaListener));
		endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
		endpoint.setTopicPartitions(tps);
		endpoint.setTopics(topics);
		endpoint.setTopicPattern(resolvePattern(kafkaListener));
		endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
		endpoint.setListenerInfo(resolveExpressionAsBytes(kafkaListener.info(), "info"));
		String group = kafkaListener.containerGroup();
		if (StringUtils.hasText(group)) {
			Object resolvedGroup = resolveExpression(group);
			if (resolvedGroup instanceof String) {
				endpoint.setGroup((String) resolvedGroup);
			}
		}
		String concurrency = kafkaListener.concurrency();
		if (StringUtils.hasText(concurrency)) {
			endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));
		}
		String autoStartup = kafkaListener.autoStartup();
		if (StringUtils.hasText(autoStartup)) {
			endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
		}
		resolveKafkaProperties(endpoint, kafkaListener.properties());
		endpoint.setSplitIterables(kafkaListener.splitIterables());
		if (StringUtils.hasText(kafkaListener.batch())) {
			endpoint.setBatchListener(Boolean.parseBoolean(kafkaListener.batch()));
		}
		endpoint.setBeanFactory(this.beanFactory);
		resolveErrorHandler(endpoint, kafkaListener);
		resolveContentTypeConverter(endpoint, kafkaListener);
		resolveFilter(endpoint, kafkaListener);
	}

各种参数注册,尤其是其中的ID和handler是必须的,不注册不行;笔者试着自己设置endpoint,发现其中的各种handler注册。 

解决方式

先写一个工具类,用于创建一些关键类的bean,定义了发送者创建,消费者工厂类,消费者的创建由注解扫描实现,引用工具类的消费者容器工厂bean。

public class KafkaConfigUtil {

    private DefaultKafkaProducerFactory<String, String> initProducerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaProducerFactory<>(kafkaProperties.buildProducerProperties());
    }

    public KafkaTemplate<String, String> initKafkaTemplate(KafkaProperties kafkaProperties) {
        return new KafkaTemplate<>(initProducerFactory(kafkaProperties));
    }

    private ConsumerFactory<? super Integer, ? super String> initConsumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
    }

    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
    initKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(initConsumerFactory(kafkaProperties));
        return factory;
    }
}

1、通过Map接收多数据源

定义一个配置接收器,仿造zuul的模式 


@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaMultiProperties {

    private Map<String, KafkaProperties> routes;

    public Map<String, KafkaProperties> getRoutes() {
        return routes;
    }

    public void setRoutes(Map<String, KafkaProperties> routes) {
        this.routes = routes;
    }
}

每一个route其实就说一套kafka,再写一个Configuration,注入配置文件

@Configuration
@EnableConfigurationProperties(KafkaMultiProperties.class)
public class KafkaConfiguration {
    
}

这样就可以注入配置了,从此可以根据配置的不同初始化不同的kafka集群逻辑。 这样就可以把自定义的Properties注入Springboot的placeholder中。

2、通过自定义扫描支持消费者

如果消费者或者发送者逻辑需要写在当前kafka网关应用,那么只能通过自定义扫描方式支持配置不同,所有配置的生成者和消费者必须代码实现逻辑,通过配置加载方式,自定义扫描注入bean即可。以消费者为例,生产者不涉及注解发送方式相对简单。

public class KafkaConfigInit {

    private KafkaMultiProperties kafkaMultiProperties;

    private ConfigurableApplicationContext applicationContext;

    public KafkaConfigInit(KafkaMultiProperties kafkaMultiProperties,
                           ConfigurableApplicationContext applicationContext) {
        this.kafkaMultiProperties = kafkaMultiProperties;
        this.applicationContext = applicationContext;
    }

    @PostConstruct
    public void initConfig() {
        if (kafkaMultiProperties == null || kafkaMultiProperties.getRoutes() == null) return;
        kafkaMultiProperties.getRoutes().forEach((k, v) -> {
            //register producer by config
            ConfigurableListableBeanFactory beanFactory = applicationContext.getBeanFactory();
            beanFactory.registerSingleton(k + "_producer", KafkaConfigUtil.initKafkaTemplate(v));

            //register consumer container factory
            KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory = KafkaConfigUtil.initKafkaListenerContainerFactory(v);
            beanFactory.registerSingleton(k + "_consumerFactory", kafkaListenerContainerFactory);
        });
    }
}

写了一个初始化的bean,用于通过配置加载bean。但是有2个问题:

  1. 消费者是注解方式扫描,bean需要根据配置加载,不能写在代码里面
  2. 这里仅仅是注册bean,并不会被beanpostprocessor处理

关于第1点

因为需要按照配置加载,不能代码写bean的加载逻辑,只能自己扫描按照配置加载,那么需要自定义扫描注解和扫描包名(减少扫描范围,提高效率)

关于第2点

需要手动执行beanpostprocessor的逻辑即可

show me the code

完善刚刚写的部分代码:

写一个注解

@Target({ ElementType.TYPE, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface KafkaConfigConsumer {
    String beanId() default "";
}

通过beanId区分,配置文件的key+"_consumer"可以作为唯一标识,定义一种标准

可以使用Spring的

PathMatchingResourcePatternResolver

自己解析resources信息,来拿到写的自定义注解的类,然后生成对象,注入Spring

public class KafkaConfigInit {

    private KafkaMultiProperties kafkaMultiProperties;

    private ConfigurableApplicationContext applicationContext;

    private KafkaListenerAnnotationBeanPostProcessor<?,?> kafkaListenerAnnotationBeanPostProcessor;

    private static final Map<String, Object> consumerMap = new ConcurrentHashMap<>();

    public KafkaConfigInit(KafkaMultiProperties kafkaMultiProperties, ConfigurableApplicationContext applicationContext, KafkaListenerAnnotationBeanPostProcessor<?, ?> kafkaListenerAnnotationBeanPostProcessor) {
        this.kafkaMultiProperties = kafkaMultiProperties;
        this.applicationContext = applicationContext;
        this.kafkaListenerAnnotationBeanPostProcessor = kafkaListenerAnnotationBeanPostProcessor;
    }

    @PostConstruct
    public void initConfig() throws IOException {
        scanConsumer();
        if (kafkaMultiProperties == null || kafkaMultiProperties.getRoutes() == null) return;
        kafkaMultiProperties.getRoutes().forEach((k, v) -> {
            //register producer by config
            ConfigurableListableBeanFactory beanFactory = applicationContext.getBeanFactory();
            beanFactory.registerSingleton(k + "_producer", KafkaConfigUtil.initKafkaTemplate(v));

            //register consumer container factory
            KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory = KafkaConfigUtil.initKafkaListenerContainerFactory(v);
            beanFactory.registerSingleton(k + "_containerFactory", kafkaListenerContainerFactory);

            beanFactory.registerSingleton(k+"_consumer", consumerMap.get(k+"_consumer"));
            kafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(consumerMap.get(k+"_consumer"), k+"_consumer");
        });
    }

    private void scanConsumer() throws IOException {
        SimpleMetadataReaderFactory register = new SimpleMetadataReaderFactory();
        PathMatchingResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
        Resource[] resources = resolver.getResources(ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX + "com/feng/kafka/demo/init/*");
        Arrays.stream(resources).forEach((resource)->{
            try {
                MetadataReader metadataReader = register.getMetadataReader(resource);
                if (metadataReader.getAnnotationMetadata().hasAnnotatedMethods("org.springframework.kafka.annotation.KafkaListener")){
                    String className = metadataReader.getClassMetadata().getClassName();
                    Class<?> clazz = Class.forName(className);
                    KafkaConfigConsumer kafkaConfigConsumer = clazz.getDeclaredAnnotation(KafkaConfigConsumer.class);
                    Object obj = clazz.newInstance();
                    consumerMap.put(kafkaConfigConsumer.beanId(), obj);
                }
            } catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {
                throw new RuntimeException(e);
            }
        });
    }

}

同时,需要手动执行 

kafkaListenerAnnotationBeanPostProcessor

的逻辑,上面有源码分析,而且因为要支持多数据源,所以需要修改消费者的注解参数

//@KafkaListener(topics = "${test.topic}")
//@Component
@KafkaConfigConsumer(beanId = "xxx_consumer")
public class KafkaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

    private String payload;

//    @KafkaHandler
    @KafkaListener(topics = "${test.topic}", beanRef = "xxx_listener", containerFactory = "xxx_containerFactory")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {
        LOGGER.info("----------------received payload='{}'", consumerRecord.toString());
        payload = consumerRecord.toString();
    }


    // other getters


    public String getPayload() {
        return payload;
    }

    public void setPayload(String payload) {
        this.payload = payload;
    }
}

增加beanRef属性外加我们自己写的注解,然后通过@Configuration注入

@Configuration
@EnableConfigurationProperties(KafkaMultiProperties.class)
public class KafkaConfiguration {

    @Bean
    public KafkaConfigInit initKafka(KafkaMultiProperties kafkaMultiProperties,
                                     ConfigurableApplicationContext applicationContext,
                                     KafkaListenerAnnotationBeanPostProcessor<?, ?> kafkaListenerAnnotationBeanPostProcessor){
        return new KafkaConfigInit(kafkaMultiProperties, applicationContext, kafkaListenerAnnotationBeanPostProcessor);
    }
}

然后修改配置文件和单元测试类

spring:
  kafka:
    routes:
      xxx:
        producer:
          batchSize: 1
        consumer:
          auto-offset-reset: earliest
          group-id: xxx

然后修改单元测试代码

@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class DemoMainTest {

    @Lazy
    @Autowired
    private KafkaConsumer consumer;

    @Autowired
    private ApplicationContext applicationContext;

    @Value("${test.topic}")
    private String topic;

    @Test
    void embedKafka() throws InterruptedException {
        String data = "Sending with our own simple KafkaProducer";

        applicationContext.getBean("xxx_producer", KafkaTemplate.class).send(topic, data);
        Thread.sleep(3000);
        assertThat(consumer.getPayload(), containsString(data));
        Thread.sleep(10000);
    }
}

执行单元测试成功

 

数据正确发送消费,断言正常 

3、通过字节码生成支持消费者

上面的方式觉得还是不方便,一般而言处理消息和消费消息是异步的,即使是同步也不会在消费线程直接处理,一般是发送到其他地方接口处理,所以为啥还要写消费者代码呢,默认一个不就好了,但是注解参数确是常量,那么字节码生成一个唯一的类即可。

如果生成者和消费者处理逻辑不用网关应用处理,那么仅仅是无脑转发,类似zuul,可以通过字节码生成方式实现统一逻辑,主要是消费者,毕竟有注解,生产者不存在注解可以直接new出来注入bean。

以javassist为例,简单些,当然asm也可以

show me the code

其实就说把扫描的消费者类,变成固定某个类消费

//@KafkaListener(topics = "${test.topic}")
//@Component
//@KafkaConfigConsumer(beanId = "xxx_consumer")
public class KafkaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

    private String payload;

//    @KafkaHandler
//    @KafkaListener(topics = "${test.topic}", beanRef = "xxx_listener", containerFactory = "xxx_containerFactory")
    public void receive(ConsumerRecord<?, ?> consumerRecord) {
        LOGGER.info("----------------received payload='{}'", consumerRecord.toString());
        payload = consumerRecord.toString();
    }

去掉注解,因为注解需要我们动态加上去,下一步修改bean创建流程

    @PostConstruct
    public void initConfig() throws IOException {
//        scanConsumer();
        if (kafkaMultiProperties == null || kafkaMultiProperties.getRoutes() == null) return;
        kafkaMultiProperties.getRoutes().forEach((k, v) -> {
            //register producer by config
            ConfigurableListableBeanFactory beanFactory = applicationContext.getBeanFactory();
            beanFactory.registerSingleton(k + "_producer", KafkaConfigUtil.initKafkaTemplate(v));

            //register consumer container factory
            KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory = KafkaConfigUtil.initKafkaListenerContainerFactory(v);
            beanFactory.registerSingleton(k + "_containerFactory", kafkaListenerContainerFactory);

//            beanFactory.registerSingleton(k + "_consumer", consumerMap.get(k + "_consumer"));
            Object obj = initConsumerBean(k);
            beanFactory.registerSingleton(k + "_consumer", obj);
            kafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(obj, k + "_consumer");
        });
    }

    private Object initConsumerBean(String key) {
        try {
            ClassPool pool = ClassPool.getDefault();
            CtClass ct = pool.getCtClass("com.feng.kafka.demo.init.KafkaConsumer");
            //修改类名,避免重复
            ct.setName("com.feng.kafka.demo.init.KafkaConsumer"+key);
            //获取类中的方法
            CtMethod ctMethod = ct.getDeclaredMethod("receive");
            MethodInfo methodInfo = ctMethod.getMethodInfo();
            ConstPool cp = methodInfo.getConstPool();
            //获取注解属性
            AnnotationsAttribute attribute = new AnnotationsAttribute(cp, AnnotationsAttribute.visibleTag);
            Annotation annotation = new Annotation("org.springframework.kafka.annotation.KafkaListener", cp);
            ArrayMemberValue arrayMemberValue = new ArrayMemberValue(cp);
            arrayMemberValue.setValue(new MemberValue[]{new StringMemberValue("embedded-test-topic", cp)});
            annotation.addMemberValue("topics", arrayMemberValue);
            annotation.addMemberValue("beanRef", new StringMemberValue(key+"_listener", cp));
            annotation.addMemberValue("containerFactory", new StringMemberValue(key+"_containerFactory", cp));
            attribute.addAnnotation(annotation);
            methodInfo.addAttribute(attribute);
            byte[] bytes = ct.toBytecode();
            Class<?> clazz = ReflectUtils.defineClass("com.feng.kafka.demo.init.KafkaConsumer" + key, bytes, Thread.currentThread().getContextClassLoader());
            return clazz.newInstance();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

通过字节码生成和动态加载class方式,生成唯一的对象,实现通过配置方式支持多数据源,不需要写一句消费代码。

单元测试

去掉了断言,因为类是动态变化的了。 

总结

实际上spring-kafka已经非常完善了,spring-kafka插件的支持也很完善,不需要关注kafka的消费过程,只需要配置即可,但是也为灵活性埋下了隐患,当然一般而言我们基本上用不到多kafka的情况,也不会做一个kafka网关应用,不过当业务需要的时候,可以设计一套kafka网关应用,分发kafka的消息,起到一个流量网关的能力,解耦业务的应用,实现架构的松耦合。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/889408.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

FastAPI框架使用枚举来型来限定参数、FastApi框架隐藏没多大意义的Schemes模型部分内容以及常见的WSGI服务器Gunicorn、uWSGI了解

一、FastAPI框架使用枚举来型来限定参数 FastAPI框架验证时&#xff0c;有时需要通过枚举的方式来限定参数只能为某几个值中的一个&#xff0c;这时就可以使用FastAPI框架的枚举类型Enum了。publish:December 23, 2020 -Wednesday 代码如下&#xff1a; #引入Enum模块 from fa…

Python常用的函数大全!

对Python的内置函数进行了非常详细且有条理的分组和描述。 第一组 print()&#xff1a;用于输出信息到控制台。input()&#xff1a;用于从用户那里接收输入。len()&#xff1a;返回对象&#xff08;如字符串、列表、元组等&#xff09;的长度。类型转换函数&#xff08;int()…

YOLOv11改进策略【损失函数篇】| 利用MPDIoU,加强边界框回归的准确性

一、背景 目标检测和实例分割中的关键问题&#xff1a; 现有的大多数边界框回归损失函数在不同的预测结果下可能具有相同的值&#xff0c;这降低了边界框回归的收敛速度和准确性。 现有损失函数的不足&#xff1a; 现有的基于 ℓ n \ell_n ℓn​范数的损失函数简单但对各种尺度…

vSAN06:ESA与OSA对比、ESA安装、新架构、工作方式、自动策略管理、原生快照、数据压缩、故障处理

目录 vSAN ESAvSAN ESA 安装ESA新架构ESA工作方式ESA自动策略管理自适应RAID5策略 原生快照支持数据压缩的改进ESA故障处理 vSAN ESA vSAN ESA 安装 流程和OSA完全一致&#xff0c;但要注意要勾选启用vSAN ESA ESA和OSA的底层架构不一样&#xff0c;但是UI上是一致的。 生产环…

使用Python编写你的第一个算法交易程序

背景 Background ​ 最近想学习一下量化金融&#xff0c;总算在盈透投资者教育&#xff08;IBKRCampus&#xff09;板块找到一篇比较好的算法交易入门教程。我在记录实践过程后&#xff0c;翻译成中文写成此csdn博客&#xff0c;分享给大家。 ​ 如果你的英语好可以直接看原文…

2024百度云智大会|百度大模型内容安全合规探索与实践

9月25日&#xff0c;2024百度云智大会在北京举办。会上&#xff0c;百度智能云分别针对算力、模型、AI 应用&#xff0c;全面升级百舸 AI 异构计算平台 4.0、千帆大模型平台 3.0 两大 AI 基础设施&#xff0c;并升级代码助手、智能客服、数字人三大 AI 原生应用产品。 在大模型…

[uni-app]小兔鲜-08云开发

uniCloud可以通过JS开发服务端,包含云数据库, 云函数, 云存储等功能, uniCloud可结合 uni-ui 组件库使用 效果展示: <picker>城市选择组件不支持h5端和APP端, 所以我们使用 <uni-data-picker>组件进行兼容处理 <uni-data-picker>的数据使用云数据库的数据 云…

K8s中pod的管理和优化

一、k8s中的资源 1.1 资源管理介绍 在kubernetes中&#xff0c;所有的内容都抽象 资源&#xff0c;用户需要通过操作资源来管理kubernetes。kubernetes的本质上就是一个集群系统&#xff0c;用户可以在集群中部署各种服务所谓的部署服务&#xff0c;其实就是在kubernetes集群中…

【D3.js in Action 3 精译_030】3.5 给 D3 条形图加注图表标签(下):Krisztina Szűcs 人物专访 + 3.6 本章小结

当前内容所在位置&#xff08;可进入专栏查看其他译好的章节内容&#xff09; 第一部分 D3.js 基础知识 第一章 D3.js 简介&#xff08;已完结&#xff09; 1.1 何为 D3.js&#xff1f;1.2 D3 生态系统——入门须知1.3 数据可视化最佳实践&#xff08;上&#xff09;1.3 数据可…

Hive3.x版本调优总结

文章目录 第 1 章 Explain 查看执行计划&#xff08;重点&#xff09;1.1 创建测试用表1&#xff09;建大表、小表和 JOIN 后表的语句2&#xff09;分别向大表和小表中导入数据 1.2 基本语法1.3 案例实操 第 2 章 Hive 建表优化2.1 分区表2.1.1 分区表基本操作2.1.2 二级分区2.…

WMS系统拣货管理的优化与创新

一、WMS系统拣货管理的重要性 随着电子商务的快速发展&#xff0c;物流仓储行业面临着巨大的挑战。订单量的激增导致传统的手工拣货方式难以满足需求&#xff0c;而WMS系统的引入则解决了这一问题。通过WMS系统&#xff0c;仓库可以实现自动化、智能化的拣货管理&#xff0c;大…

小米路由器R3Gv2安装openwrt记录

前言 小米路由器R3Gv2的硬件配置与小米路由器4A千兆版一致&#xff0c;但bootloader有所不同&#xff0c;因此openwrt的固件不要互刷。另外&#xff0c;R3Gv2和R3G、4A百兆版是不同的设备&#xff0c;切勿混淆。 硬件信息 OpenWrt参数页-Xiaomi MiWiFi 3G v2 CPU&#xff1a…

小猿口算APP脚本(协议版)

小猿口算是一款专注于数学学习的教育应用,主要面向小学阶段的学生。它提供多种数学练习和测试,包括口算、速算、应用题等。通过智能化的题目生成和实时批改功能,帮助学生提高数学计算能力。此外,它还提供详细的学习报告和分析,帮助家长和教师了解学生的学习进度和薄弱环节…

[含文档+PPT+源码等]精品大数据项目-基于python爬虫实现的大数据岗位的挖掘与分析

大数据项目——基于Python爬虫实现的大数据岗位的挖掘与分析&#xff0c;其背景主要源于以下几个方面&#xff1a; 一、大数据时代的来临 随着互联网、物联网、云计算等技术的快速发展&#xff0c;数据呈现出爆炸式增长。根据国际数据公司&#xff08;IDC&#xff09;的预测&…

新电脑 Windows 系统初始配置

文章目录 前言1 前置配置2 安装软件2.1 通讯工具2.2 后端开发工具2.3 硬件开发工具2.4 前端开发工具2.4 其它工具 3 Windows 11 优化4 写在最后 前言 分区&#xff08;个人习惯&#xff09;&#xff1a;1TB SSD 分为 2 个分区&#xff0c;一个 256GB 分区为系统盘&#xff0c;剩…

003 Springboot操作RabbitMQ

Springboot整合RabbitMQ 文章目录 Springboot整合RabbitMQ1.pom依赖2.yml配置3.配置队列、交换机方式一&#xff1a;直接通过配置类配置bean方式二&#xff1a;消息监听通过注解配置 4.编写消息监听发送测试5.其他类型交换机配置1.FanoutExchange2.TopicExchange3.HeadersExcha…

【AIGC】寻找ChatGPT最佳推理步骤:CoT思维链技术的探索与应用

博客主页&#xff1a; [小ᶻZ࿆] 本文专栏: AIGC | ChatGPT 文章目录 &#x1f4af;前言&#x1f4af;CoT思维链概述&#x1f4af;CoT思维链在大型语言模型中的应用&#x1f4af;CoT思维链改变对模型推理能力的理解和改进方式多样化应用场景挑战与未来发展总结 &#x1f4a…

力扣 前缀和

找子数组的个数了解前缀和的基础。 前缀和大致理解为到达某个位置&#xff0c;前面几个数的总和&#xff0c;即s[i1]s[i]a[i1]&#xff0c;可以通过一次循环获得。然后几个前缀和作差&#xff0c;即可得到某个位置到某个位置的和&#xff0c;根据map的键值对进行更新次数。 题…

【JavaEE】——回显服务器的实现

阿华代码&#xff0c;不是逆风&#xff0c;就是我疯 你们的点赞收藏是我前进最大的动力&#xff01;&#xff01; 希望本文内容能够帮助到你&#xff01;&#xff01; 目录 一&#xff1a;引入 1&#xff1a;基本概念 二&#xff1a;UDP socket API使用 1&#xff1a;socke…

高频股票期货ETF历史高频数据源

【数据源】 银河金融数据库&#xff08;yinhedata.com&#xff09; 提供金融数据股票、美股、期货以及ETF等高频tick数据&#xff0c;分钟级别数据。 MACD背离是指MACD指标与价格走势之间发生的方向性差异&#xff0c;这通常被视为市场可能发生趋势反转的信号。以下是一个具体…