概念
消费者组(Consumer Group):由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者;
整体流程
流程说明:
- 消费者组包括多个消费者,每个消费者只能消费分区中的一部分数据;
- 当一个消费者组中的消费者读取一个分区中的数据时,其他消费者就不能再读取该分区中的数据;
- 一个消费者组可以有多个消费者,每个消费者只能消费分配给该消费者组的某些主题的某些分区;
- 同一个分区只会被一个消费者组中的一个消费者消费,不同消费者组之间可以重复消费;
- 当消费者组中的某个消费者宕机后,Kafka会将该消费者所消费的分区重新分配给其他消费者,从而实现消费者的高可用性;
- 消费者组中的消费者可以动态加入和退出,Kafka会自动重新分配分区;
- 在同一个消费者组内,消费者之间可以进行负载均衡,以此来提高消息的吞吐量和消费的效率;
- 消费者组可以通过消费者组ID(groupid)来标识,一个消费者组ID可以同时消费多个主题;
配置参数说明
参数名称 | 描述 |
---|---|
bootstrap.servers | 向Kafka集群建立初始连接用到的host/port列表。 |
key.deserializer和value.deserializer | 指定接收消息的key和value的反序列化类型。一定要写全类名。 |
group.id | 标记消费者所属的消费者组。 |
enable.auto.commit | 默认值为true,消费者会自动周期性地向服务器提交偏移量。 |
auto.commit.interval.ms | 如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s。 |
auto.offset.reset | 当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。 |
offsets.topic.num.partitions | __consumer_offsets的分区数,默认是50个分区。 |
heartbeat.interval.ms | Kafka消费者和coordinator之间的心跳时间,默认3s。 该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的1/3。 |
session.timeout.ms | Kafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。 |
max.poll.interval.ms | 消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。 |
fetch.min.bytes | 默认1个字节。消费者获取服务器端一批消息最小的字节数。 |
fetch.max.wait.ms | 默认500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。 |
fetch.max.bytes | 默认Default: 52428800(50 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。 |
max.poll.records | 一次poll拉取数据返回消息的最大条数,默认是500条。 |
分区策略
- Range
# 特点
确保每个消费者消费的分区数量是均衡的。
注意:Rangle范围分配策略是针对每个Topic的。
# 配置
配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RangeAssignor。
# 算法公式
n = 分区数量 / 消费者数量
m = 分区数量 % 消费者数量
前m个消费者消费n+1个,剩余消费者消费n个。
- RoundRobin
# 特点
将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序(topic和分区的hashcode进行排序),然后通过轮询方式逐个将分区以此分配给每个消费者。
# 配置
配置消费者的partition.assignment.strategy为org.apache.kafka.clients.consumer.RoundRobinAssignor。
- Sticky
# 特点
在执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
粘性分区是Kafka从0.11.x版本开始引入这种分配策略,首先会尽量均衡的放置分区到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。
- CooperativeSticky
可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range+ CooperativeSticky。
回调函数说明
事件回调
- 设置回调
// 设置事件回调
m_event_cb = new ConsumerEventCb;
errorCode = m_config->set("event_cb", m_event_cb, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("Conf set(event_cb) failed, errorStr:%s\n", errorStr.c_str());
return;
}
- 回调处理
// 设置事件回调
class ConsumerEventCb : public RdKafka::EventCb
{
public:
void event_cb(RdKafka::Event &event)
{
switch (event.type())
{
case RdKafka::Event::EVENT_ERROR:
break;
case RdKafka::Event::EVENT_STATS:
break;
case RdKafka::Event::EVENT_LOG:
break;
case RdKafka::Event::EVENT_THROTTLE:
break;
default:
break;
}
}
};
消费者组再平衡回调
- 设置回调
// 设置消费者组再平衡回调
m_rebalance_cb = new ConsumerRebalanceCb;
errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
ELogError(("%s|Conf set(rebalance_cb) failed, errorStr:%s", GET_CODE_INFO(), errorStr.c_str()));
break;
}
- 回调处理
// 设置消费者组再平衡回调
// 注册该函数会关闭 rdkafka 的自动分区赋值和再分配
class ConsumerRebalanceCb : public RdKafka::RebalanceCb
{
private:
// 打印当前获取的分区
static void printTopicPartition(const std::vector<RdKafka::TopicPartition *>&partitions)
{
for (unsigned int i = 0; i < partitions.size(); i++)
{
printf("count:%d, topic:%s,partition:%d\n",
i,
partitions[i]->topic().c_str(),
partitions[i]->partition());
}
}
public:
// 消费者组再平衡回调
void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition *> &partitions)
{
printf("RebalanceCb: %s", RdKafka::err2str(err).c_str());
printTopicPartition(partitions);
// 分区分配成功
if (RdKafka::ERR__ASSIGN_PARTITIONS == err)
{
// 消费者订阅这些分区
consumer->assign(partitions);
// 获取消费者组本次订阅的分区数量,可以属于不同的topic
m_partitionCount = (int)partitions.size();
}
else // 分区分配失败
{
// 消费者取消订阅所有的分区
consumer->unassign();
// 消费者订阅分区的数量为0
m_partitionCount = 0;
}
}
private:
int m_partitionCount; // 消费者组本次订阅的分区数量
};
流程(c++)
- 配置消费者客户端;
- 订阅主题和分区;
- 拉取消息;
- 处理消息;
- 提交消费位移;
配置消费者客户端
int CKafkaConsumer::Create()
{
std::string errorStr;
RdKafka::Conf::ConfResult errorCode;
do
{
// 1、创建配置对象
// 1.1、构造 consumer conf 对象
m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(nullptr == m_config)
{
printf("Create RdKafka Conf failed.\n");
break;
}
// 必要参数1:指定 broker 地址列表
errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("Conf set(bootstrap.servers) failed, errorStr:%s.\n",
errorStr.c_str());
break;
}
// 必要参数2:设置消费者组 id
errorCode = m_config->set("group.id", m_groupID, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("Conf set(group.id) failed, errorStr:%s.\n",
errorStr.c_str());
break;
}
// 设置事件回调
m_event_cb = new ConsumerEventCb;
errorCode = m_config->set("event_cb", m_event_cb, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("Conf set(event_cb) failed, errorStr:%s.\n",
errorStr.c_str());
break;
}
// 设置消费者组再平衡回调
m_rebalance_cb = new ConsumerRebalanceCb;
errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("Conf set(rebalance_cb) failed, errorStr:%s.\n",
errorStr.c_str());
break;
}
// 当消费者到达分区结尾,发送 RD_KAFKA_RESP_ERR__PARTITION_EOF 事件
errorCode = m_config->set("enable.partition.eof", "false", errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("Conf set(enable.partition.eof) failed, errorStr:%s.\n",
errorStr.c_str());
break;
}
// 每次最大拉取的数据大小
errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("Conf set(max.partition.fetch.bytes) failed, errorStr:%s.\n",
errorStr.c_str());
break;
}
// 设置分区分配策略:range、roundrobin、cooperative-sticky
errorCode = m_config->set("partition.assignment.strategy", "range", errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("Conf set(partition.assignment.strategy) failed, errorStr:%s.\n",
errorStr.c_str());
break;
}
// 心跳探活超时时间---1s
errorCode = m_config->set("session.timeout.ms", "6000", errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("Conf set(session.timeout.ms) failed, errorStr:%s.\n",
errorStr.c_str());
break;
}
// 心跳保活间隔
errorCode = m_config->set("heartbeat.interval.ms", "2000", errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("Conf set(heartbeat.interval.ms) failed, errorStr:%s.\n",
errorStr.c_str());
break;
}
// 1.2、创建 topic conf 对象
m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if (nullptr == m_topicConfig)
{
printf("Create RdKafka Topic Conf failed.\n");
break;
}
// 必要参数3:设置新到来消费者的消费起始位置,latest 消费最新的数据,earliest 从头开始消费
errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("Topic Conf set(auto.offset.reset) failed, errorStr:%s.\n",
errorStr.c_str());
break;
}
// 默认 topic 配置,用于自动订阅 topics
errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr);
if (RdKafka::Conf::CONF_OK != errorCode)
{
printf("Conf set(default_topic_conf) failed, errorStr:%s.\n",
errorStr.c_str());
break;
}
// 2、创建 Consumer 对象
m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr);
if (nullptr == m_consumer)
{
printf("Create KafkaConsumer failed, errorStr:%s.\n",
errorStr.c_str());
break;
}
printf("Created consumer success, consumerName:%s.\n",
m_consumer->name().c_str());
return 0;
} while (0);
Destroy();
return -1;
}
订阅主题和分区
std::vector<std::string> topicsVec;
topicsVec.push_back("zd_test_topic_one");
topicsVec.push_back("zd_test_topic_two");
RdKafka::ErrorCode errorCode = m_consumer->subscribe(topicsVec);
if (RdKafka::ERR_NO_ERROR != errorCode)
{
printf("Subscribe failed, errorStr:%s\n", RdKafka::err2str(errorCode).c_str());
return;
}
拉取消息
// 可放到线程中处理
while (m_running)
{
RdKafka::Message *msg = m_consumer->consume(1000); // 1000ms超时
if(NULL != msg)
{
// 消费消息
ConsumeMsg_(msg, NULL);
m_consumer->commitAsync();
delete msg;
}
}
处理消息
void KafkaConsumer::ConsumeMsg_(RdKafka::Message *msg, void *opaque)
{
switch (msg->err())
{
case RdKafka::ERR__TIMED_OUT: // 超时
break;
case RdKafka::ERR_NO_ERROR: // 有消息进来
printf("Message in, topic:%s, partition:[%d], key:%s, payload:%s\n",
msg->topic_name().c_str(),
msg->partition(),
msg->key()->c_str(),
(char *)msg->payload());
// 消息处理
break;
default:
break;
}
}
提交消费位移
m_consumer->commitAsync();