1. 启动zookeeper
Kafka依赖zookeeper, 首先安装zookeeper
-p:设置映射端口(默认2181
)
docker run --name zookeeper \
--network app-tier \
-e ALLOW_ANONYMOUS_LOGIN=yes \
--restart=always \
-d bitnami/zookeeper:latest
2. 启动kafka
docker run --name kafka \
--network app-tier \
-p 9092:9092 \
-e ALLOW_PLAINTEXT_LISTENER=yes \
-e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
--restart=always \
-d bitnami/kafka:latest
命令 | 解释 |
---|---|
ALLOW_PLAINTEXT_LISTENER=yes | 任何人可以访问 |
KAFKA_CFG_ZOOKEEPER_CONNECT | zookeeper地址 |
KAFKA_CFG_ADVERTISED_LISTENERS | 当前kafka安装的主机地址 如果是服务器部署则配服务器IP或域名否则客户端监听消息会报地址错误 |
2. 启动kafka-map管理工具
docker run --name kafka-map \
--network app-tier \
-p 9001:8080 \
-v /usr/local/kafka-map/data:/usr/local/kafka-map/data \
-e DEFAULT_USERNAME=admin \
-e DEFAULT_PASSWORD=admin \
--restart=always \
-d dushixiang/kafka-map:latest
启动成功后, 访问客户端: http://localhost:9001
账户: admin
密码: admin
3. springboot集成kafka
pom.xml配置
<dependencies>
<!--kafka依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
配置application.yml
#------------------------------------spring----------------------------------
spring:
#------------------------------------消息队列kafka配置----------------------------------
kafka:
# kafka server的地址,如果有多个,使用逗号分割
bootstrap-servers: localhost:9092
producer:
# 发生错误后,消息重发的次数。
retries: 1
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
batch-size: 16384
# 设置生产者内存缓冲区的大小。32MB的批处理缓冲区
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
acks: 1
properties:
# 自定义拦截器
interceptor.classes: com.wms.message.kafka.interceptor.CustomProducerInterceptor
#自定义分区器
partitioner.classes: com.wms.message.kafka.interceptor.CustomPartitioner
consumer:
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
auto-commit-interval: 1S
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: earliest
# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
enable-auto-commit: false
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
# 自定义消费者拦截器
interceptor.classes: com.wms.message.kafka.interceptor.CustomConsumerInterceptor
# 默认消费者组
group-id: code-safe-group
# 设置最大轮询间隔时间(毫秒),默认值为 300000(5分钟)
# 如果两次 poll() 之间的时间超过此配置值,可能导致 rebalance, 消费者会被剔除 此处设置10分钟
max-poll-interval-ms: 600000
# 批量一次最大拉取数据量
max-poll-records: 1000
batch:
# 批消费并发量,小于或等于Topic的分区数
concurrency: 3
listener:
# 在侦听器容器中运行的线程数。
concurrency: 5
#listner负责ack,每调用一次,就立即commit
ack-mode: manual_immediate
missing-topics-fatal: false
topics:
# 自定义主题名称
twsm: webSocket_send_message_dev
group-id: group-id
topic-name:
- topic1
测试发送消息到kafka
/**
* Kafka测试
*
* @version 1.0
* @author: web
* @date: 2024/1/18 15:07
*/
@Slf4j
@RestController
@RequestMapping("/message/kafkaTest")
public class KafkaTestController extends BaseController
{
@Autowired
private KafkaUtils kafkaUtils;
/**
* 生产者_推送消息到kafka
*
* @param msg
* @author: web
* @return: AjaxResult
* @date: 2024/1/18 15:16
*/
@PostMapping("/send")
public AjaxResult send(@RequestBody Map<String, Object> msg)
{
try
{
String userId = msg.get("userId").toString();
Object content = msg.get("content");
Message message = kafkaUtils.setMessage(userId, content);
kafkaUtils.send(KafkaUtils.TOPIC_TEST, message);
}
catch (Exception e)
{
log.error("生产者_推送消息到kafka发生异常");
}
return success();
}
/**
* 消费者1
*
* @param record
* @param ack
* @param topic
* @author: web
* @return: void
* @date: 2024/1/18 15:07
*/
@KafkaListener(topics = KafkaUtils.TOPIC_TEST)
public void topicTest1(ConsumerRecord<?, ?> record, Acknowledgment ack,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic)
{
Optional message = Optional.ofNullable(record.value());
if (message.isPresent())
{
Object msg = message.get();
log.info("topic.group1 消费了: Topic:" + topic + ",Message:" + msg);
ack.acknowledge();
}
}
/**
* 消费者2
*
* @param record
* @param ack
* @param topic
* @author: web
* @return: void
* @date: 2024/1/18 15:07
*/
// @KafkaListener(topics = KafkaUtils.TOPIC_TEST, groupId = KafkaUtils.TOPIC_GROUP2)
// public void topicTest2(ConsumerRecord<?, ?> record, Acknowledgment ack,
// @Header(KafkaHeaders.RECEIVED_TOPIC) String topic)
// {
//
// Optional message = Optional.ofNullable(record.value());
// if (message.isPresent())
// {
// Object msg = message.get();
// log.info("topic.group2 消费了: Topic:" + topic + ",Message:" + msg);
// ack.acknowledge();
// }
// }
}
KafkaUtils类
/**
* 生产者
*
* @version: 1.0
* @author: web
* @date: 2024/1/18 10:37
*/
@Component
@Slf4j
public class KafkaUtils
{
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
/**
* 自定义topic
*/
public static final String TOPIC_TEST = "topic.code-safe";
/**
* 自定义消费组
*/
public static final String TOPIC_GROUP1 = "topic.group1";
public static final String TOPIC_GROUP2 = "topic.group2";
// 业务相关topic
/**
* 主题: webSocket发送消息到客户端
*/
public static String TOPIC_WEBSOCKET_SEND_MESSAGE;
@Autowired
private String[] kafkaTopicName;
/**
* 获取配置文件中的盐值,并设置到静态变量中
*
* @param topic 主题
*/
@Value("${spring.kafka.topics.twsm}")
private void setTwsmTopic(String topic)
{
TOPIC_WEBSOCKET_SEND_MESSAGE = topic;
}
/**
* 发送消息
*
* @param topic 主题
* @param message 消息内容
* @author: web
* @return: void
* @date: 2024/1/18 10:42
*/
public void send(String topic, Object message)
{
if (StringUtils.isEmpty(topic) || StringUtils.isNull(message))
{
throw new ServiceException("生产者发送消息到kafka_主题或消息内容不能为空!");
}
String obj2String = JsonUtils.toJsonString(message);
// log.info("准备发送消息为:{}", obj2String);
//发送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, obj2String);
// 监听回调
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>()
{
@Override
public void onFailure(Throwable throwable)
{
//发送失败的处理
log.error(topic + " - 生产者 发送消息失败:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> stringObjectSendResult)
{
//成功的处理
// log.info(topic + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
}
});
}
/**
* 设置websocket发送的消息体
*
* @param userId 用户ID
* @param msg 消息内容
* @author: web
* @return: Message 消息对象
* @date: 2024/1/19 11:36
*/
public Message setMessage(String userId, Object msg)
{
Message message = new Message();
message.setSendUserId(userId);
message.setSendTime(DateUtils.getTime());
message.setSendContent(String.valueOf(msg));
return message;
}
}
Message类
@Data
public class Message implements Serializable
{
private static final long serialVersionUID = -118L;
/**
* 发送人ID
*/
private String sendUserId;
/**
* 发送人
*/
// private String sendUserName;
/**
* 发送时间
*/
private String sendTime;
/**
* 发送内容
*/
private String sendContent;
}
监听消息
/**
* 消息接收监听器【分布式系统】
*
* @version: 1.0
* @author: web
* @date: 2024/1/19 13:44
*/
@Component
@Slf4j
public class MessageListener
{
/**
* 根据用户id发送消息到客户端
*
* @param record
* @param ack
* @param topic
* @author: web
* @return: void
* @date: 2024/1/20 22:05
*/
@KafkaListener(topics = "#{'${spring.kafka.topics.twsm}'}", groupId = "#{topicGroupId}")
public void sendMessageByUserId(ConsumerRecord<String, String> record, Acknowledgment ack,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic)
{
Optional<String> optional = Optional.ofNullable(record.value());
if (optional.isPresent())
{
Message message = JsonUtils.parseObject(optional.get(), Message.class);
if (StringUtils.isNull(message))
{
log.error("消费者收到kafka消息的内容为空!");
return;
}
// log.info("消费者收到kafka消息");
String sendUserId = message.getSendUserId();
String sendContent = message.getSendContent();
// 确认收到消息
ack.acknowledge();
}
}
}