1. pom
< dependency>
< groupId> org.springframework.kafka</ groupId>
< artifactId> spring-kafka</ artifactId>
</ dependency>
2. 生产者
import com. alibaba. fastjson. JSON ;
import com. alibaba. fastjson. serializer. SerializerFeature ;
import com. xxx. npi. module. common. msg. dto. MsgBase ;
import org. springframework. beans. factory. annotation. Value ;
import org. springframework. kafka. core. KafkaTemplate ;
import org. springframework. stereotype. Service ;
import java. util. ArrayList ;
import java. util. List ;
@Service
public class MyMessageProducerService {
@Value ( "${npi.default-url}" )
private String domain;
private final KafkaTemplate < String , String > kafkaTemplate;
public MyMessageProducerService ( KafkaTemplate < String , String > kafkaTemplate) {
this . kafkaTemplate = kafkaTemplate;
}
public < T extends MsgBase > void sendMessage ( String topicName, T msgObj) {
List < T > list = new ArrayList < > ( ) ;
list. add ( msgObj) ;
if ( "https://npi.xxx.com" . equals ( domain) ) {
kafkaTemplate. send ( topicName, toJsonString ( list) ) ;
}
}
public < T extends MsgBase > void sendMessage ( String topicName, List < T > list) {
if ( "https://npi.xxx.com" . equals ( domain) ) {
kafkaTemplate. send ( topicName, toJsonString ( list) ) ;
}
}
private String toJsonString ( Object obj) {
return JSON . toJSONString ( obj,
SerializerFeature. WriteDateUseDateFormat ,
SerializerFeature. WriteMapNullValue ,
SerializerFeature. WriteNullListAsEmpty ,
SerializerFeature. WriteNullStringAsEmpty ,
SerializerFeature. DisableCircularReferenceDetect ) ;
}
}
3. 配置
import org. apache. kafka. clients. producer. ProducerConfig ;
import org. apache. kafka. common. config. SslConfigs ;
import org. springframework. beans. factory. annotation. Value ;
import org. springframework. context. annotation. Bean ;
import org. springframework. context. annotation. Configuration ;
import org. springframework. core. io. Resource ;
import org. springframework. kafka. annotation. EnableKafka ;
import org. springframework. kafka. core. DefaultKafkaProducerFactory ;
import org. springframework. kafka. core. KafkaTemplate ;
import org. springframework. kafka. core. ProducerFactory ;
import java. io. File ;
import java. io. IOException ;
import java. io. InputStream ;
import java. nio. file. Files ;
import java. nio. file. StandardCopyOption ;
import java. util. HashMap ;
import java. util. Map ;
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value ( "${spring.kafka.producer.bootstrap-servers}" )
private String servers;
@Value ( "${spring.kafka.producer.retries}" )
private int retries;
@Value ( "${spring.kafka.producer.acks}" )
private String acks;
@Value ( "${spring.kafka.producer.batch-size}" )
private int batchSize;
@Value ( "${spring.kafka.producer.linger-ms}" )
private int lingerMs;
@Value ( "${spring.kafka.producer.buffer-memory}" )
private int bufferMemory;
@Value ( "${spring.kafka.producer.key-serializer}" )
private String keySerializer;
@Value ( "${spring.kafka.producer.value-serializer}" )
private String valueSerializer;
@Value ( "${spring.kafka.producer.security.protocol}" )
private String securityProtocol;
@Value ( "${spring.kafka.producer.ssl.truststore.location}" )
private Resource sslTruststoreLocationResource;
@Value ( "${spring.kafka.producer.ssl.truststore.password}" )
private String sslTruststorePassword;
@Value ( "${spring.kafka.producer.sasl.mechanism}" )
private String saslMechanism;
@Value ( "${spring.kafka.producer.sasl.jaas.config}" )
private String saslJaasConfig;
@SuppressWarnings ( { "unchecked" , "rawtypes" } )
@Bean
public KafkaTemplate < String , String > kafkaTemplate ( ) {
return new KafkaTemplate ( producerFactory ( ) ) ;
}
@SuppressWarnings ( "unchecked" )
@Bean
public ProducerFactory < String , String > producerFactory ( ) {
@SuppressWarnings ( "rawtypes" )
DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory < > ( producerConfigs ( ) ) ;
return factory;
}
public Map < String , Object > producerConfigs ( ) {
Map < String , Object > props = new HashMap < > ( ) ;
props. put ( "bootstrap.servers" , servers) ;
props. put ( "acks" , acks) ;
props. put ( "retries" , retries) ;
props. put ( "batch.size" , batchSize) ;
props. put ( "linger.ms" , lingerMs) ;
props. put ( "buffer.memory" , bufferMemory) ;
props. put ( "key.serializer" , keySerializer) ;
props. put ( "value.serializer" , valueSerializer) ;
props. put ( "security.protocol" , securityProtocol) ;
props. put ( "sasl.mechanism" , saslMechanism) ;
props. put ( "sasl.jaas.config" , saslJaasConfig) ;
props. put ( ProducerConfig . ENABLE_IDEMPOTENCE_CONFIG , true ) ;
props. put ( SslConfigs . SSL_TRUSTSTORE_TYPE_CONFIG , "JKS" ) ;
try {
InputStream inputStream = sslTruststoreLocationResource. getInputStream ( ) ;
File tempFile = File . createTempFile ( "client_truststore" , ".jks" ) ;
Files . copy ( inputStream, tempFile. toPath ( ) , StandardCopyOption . REPLACE_EXISTING ) ;
props. put ( SslConfigs . SSL_TRUSTSTORE_LOCATION_CONFIG , tempFile. getAbsolutePath ( ) ) ;
props. put ( SslConfigs . SSL_TRUSTSTORE_PASSWORD_CONFIG , sslTruststorePassword) ;
} catch ( IOException e) {
throw new RuntimeException ( "Failed to locate truststore file" , e) ;
}
return props;
}
}
4. application
spring :
kafka :
producer :
bootstrap-servers : n2.ikt.xxx.com: 9092 , n3.ikt.xxx.com: 9092 , n4.ikt.xxx.com: 9092 , n5.ikt.xxx.com: 9092 , n6.ikt.xxx.com: 9092
acks : all
retries : 3
batch-size : 16384
linger-ms : 1
buffer-memory : 33554432
key-serializer : org.apache.kafka.common.serialization.StringSerializer
value-serializer : org.apache.kafka.common.serialization.StringSerializer
security.protocol : SASL_SSL
ssl.truststore.location : classpath: client_truststore.jks
ssl.truststore.password : pwd
sasl.mechanism : SCRAM- SHA- 512
sasl.jaas.config : org.apache.kafka.common.security.scram.ScramLoginModule required username='kaf- username' password='pwd';
topic :
br : mdscinpi.mdscinpi- data.tst
mem : mdscinpi.msdcinpi- data.tst
fbr : mdscinpi.inpi- data.tst
cr : mdscinpi.npi- data.tst