kafka C++实现消费者

文章目录

  • 1 Kafka 消费者的逻辑
  • 2 Kafka 的C++ API
    • 2.1 RdKafka::Conf
    • 2.2 RdKafka::Event
    • 2.3 RdKafka::EventCb
    • 2.4 RdKafka::TopicPartition
    • 2.5 RdKafka::RebalanceCb
    • 2.6 RdKafka::Message
    • 2.7 RdKafka::KafkaConsumer(核心)
  • 3 Kafka 消费者客户端开发
    • 3.1 必要的参数配置(bootstrap.servers)
    • 3.2 订阅主题和分区
    • 3.3 消息消费
    • 3.4 完整示例代码
  • 4 位移提交
    • 4.1 自动提交
    • 4.2 手动提交
    • 4.3 提交API
  • 5 消费Rebalance机制
  • 6 其他重要的消费者参数
    • 6.1 fetch.min.bytes
    • 6.2 fetch.max.bytes
    • 6.3 fetch.max.wait.ms
    • 6.4 max.partition.fetch.bytes
    • 6.5 max.poll.records
    • 6.6 connections.max.idle.ms
    • 6.7 exclude.internal.topics
    • 6.8 receive.buffer.bytes
    • 6.9 send.buffer.bytes
    • 6.10 request.timeout.ms
    • 6.11 metadata.max.age.ms
    • 6.12 reconnect.backoff.ms
    • 6.13 retry.backoff.ms
    • 6.14 isolation.level
    • 6.15 session.timeout.ms
    • 6.16 max.poll.interval.ms
    • 6.17 heartbeat.interval.ms
  • 7 broker参数详解
    • 7.1 broker.id
    • 7.2 log.dirs
    • 7.3 zookeeper.connect
    • 7.4 listeners
    • 7.5 advertised.listeners
    • 7.6 unclean.leader.election.enable
    • 番外:ISR解释
    • 7.7 delete.topic.enable
    • 7.8 log.retention. {hours|minutes|ms }
    • 7.9 log.retention.bytes
    • 7.10 min.insync.replicas
    • 7.11 num.network threads (fastdfs 网络数据读取)
    • 7.12 num.io.threads (处理我们读取到网络数据)
    • 7.13 message.max. bytes
    • 7.14 log.segment.bytes (分片的概念)
  • 8 总结

1 Kafka 消费者的逻辑

  • 配置消费者客户端参数。
  • 创建相应的消费者实例。
  • 订阅主题。
  • 拉取消息并消费;
  • 提交消息位移;
  • 关闭消费者实例;

2 Kafka 的C++ API

2.1 RdKafka::Conf

见生成者实现文章。

2.2 RdKafka::Event

见生成者实现文章。

2.3 RdKafka::EventCb

见生成者实现文章。

2.4 RdKafka::TopicPartition

static TopicPartition * create(const std::string &topic, int partition);
//创建一个TopicPartition对象。

static TopicPartition *create (const std::string &topic, int partition,int64_t offset);
//创建TopicPartition对象。

static void destroy (std::vector<TopicPartition*> &partitions);
//销毁所有TopicPartition对象。

const std::string & topic () const;
//返回Topic名称。

int partition ();
//返回分区号。

int64_t offset();
//返回位移。

void set_offset(int64_t offset);
//设置位移。

ErrorCode err();
//返回错误码。

2.5 RdKafka::RebalanceCb

virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector< TopicPartition * >
 &partitions)=0;

用于RdKafka::KafkaConsunmer的组再平衡回调函数;注册rebalance_cb回调函数会关闭rdkafka的自动分区赋值和再分配并替换应用程序的rebalance_cb回调函数。

再平衡回调函数负责对基于RdKafka::ERR_ASSIGN_PARTITIONS和RdKafka::ERR_REVOKE_PARTITIONS事件更新rdkafka的分区分配,也能处理任意前两者错误除外其它再平衡失败错误。对于RdKafka::ERR_ASSIGN_PARTITIONS和RdKafka::ERR_REVOKE_PARTITIONS事件之外的其它再平衡失败错误,必须调用unassign()同步状态。
没有再平衡回调函数,rdkafka也能自动完成再平衡过程,但注册一个再平衡回调函数可以使应用程序在执行其它操作时拥有更大的灵活性,例如从指定位置获取位移或手动提交位移。

C++封装示例:

class ConsumerRebalanceCb : public RdKafka::RebalanceCb
{
private:
    static void printTopicPartition (const std::vector<RdKafka::TopicPartition*>&partitions)        // 打印当前获取的分区
    {
        for (unsigned int i = 0 ; i < partitions.size() ; i++)
            std::cerr << partitions[i]->topic() <<
                      "[" << partitions[i]->partition() << "], ";
        std::cerr << "\n";
    }

public:
    void rebalance_cb (RdKafka::KafkaConsumer *consumer,
                       RdKafka::ErrorCode err,
                       std::vector<RdKafka::TopicPartition*> &partitions)
    {
        std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": ";
        printTopicPartition(partitions);
        if (err == RdKafka::ERR__ASSIGN_PARTITIONS)
        {
            consumer->assign(partitions);
            partition_count = (int)partitions.size();
        }
        else
        {
            consumer->unassign();
            partition_count = 0;
        }
    }
private:
    int partition_count;
};

2.6 RdKafka::Message

见生成者实现文章。

2.7 RdKafka::KafkaConsumer(核心)

KafkaConsumer是高级API,要求Kafka 0.9.0以上版本,当前支持range和roundrobin分区分配策略。

static KafkaConsumer * create(Conf *conf, std::string &errstr);
创建KafkaConsumer对象,conf对象必须配置Consumer要加入的消费者组。使用KafkaConsumer::close()进行关闭。

ErrorCode assignment(std::vector< RdKafka::TopicPartition * > &partitions);
返回由RdKafka::KafkaConsumer::assign() 设置的当前分区。

ErrorCode subscription(std::vector< std::string > &topics);
返回由RdKafka::KafkaConsumer::subscribe() 设置的当前订阅Topic。

ErrorCode subscribe(const std::vector< std::string > &topics);
更新订阅Topic分区。

ErrorCode unsubscribe();
将当前订阅Topic取消订阅分区。

ErrorCode assign(const std::vector< TopicPartition * > &partitions);
将分配分区更新为partitions。

ErrorCode unassign();
停止消费并删除当前分配的分区。

Message * consume(int timeout_ms);
消费消息或获取错误事件,触发回调函数,会自动调用注册的回调函数,包括RebalanceCb、EventCb、OffsetCommitCb等。需要使用delete释放消息。应用程序必须确保consume在指定时间间隔内调用,为了执行等待调用的回调函数,即使没有消息。当RebalanceCb被注册时,在需要调用和适当处理内部Consumer同步状态时,确保consume在指定时间间隔内调用极为重要。应用程序必须禁止对KafkaConsumer对象调用poll函数。
如果RdKafka::Message::err()是ERR_NO_ERROR,则返回正常的消息;如果RdKafka::Message::err()是ERR_NO_ERRO,返回错误事件;如果RdKafka::Message::err()是ERR_TIMED_OUT,则超时。

ErrorCode commitSync();
提交当前分配分区的位移,同步操作,会阻塞直到位移被提交或提交失败。如果注册了RdKafka::OffsetCommitCb回调函数,其会在KafkaConsumer::consume()函数内调用并提交位移。

ErrorCode commitAsync();
异步提交位移。

ErrorCode commitSync(Message *message);
基于消息对单个topic+partition对象同步提交位移。

virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets) = 0;
对指定多个TopicPartition同步提交位移。

ErrorCode commitAsync(Message *message);
基于消息对单个TopicPartition异步提交位移。

virtual ErrorCode commitAsync (const std::vector<TopicPartition*> &offsets) = 0;
对多个TopicPartition异步提交位移。

ErrorCode close();
正常关闭,会阻塞直到四个操作完成(触发避免当前分区分配的局部再平衡,停止当前赋值消费,提交位移,离开分组)

virtual ConsumerGroupMetadata *groupMetadata () = 0;
返回本Consumer实例的Consumer Group的元数据。

ErrorCode position (std::vector<TopicPartition*> &partitions)
获取TopicPartition对象中当前位移,会别填充TopicPartition对象的offset字段。

ErrorCode seek (const TopicPartition &partition, int timeout_ms)
定位TopicPartition的Consumer到位移。timeout_ms为0,会开始Seek并立即返回;timeout_ms非0,Seek会等待timeout_ms时间。

ErrorCode offsets_store (std::vector<TopicPartition*> &offsets)
为TopicPartition存储位移,位移会在auto.commit.interval.ms时提交或是被手动提交。enable.auto.offset.store属性必须设置为fasle。

3 Kafka 消费者客户端开发

3.1 必要的参数配置(bootstrap.servers)

在创建消费者的时候以下以下三个选项是必选的:

  • bootstrap.servers:指定 broker 的地址清单,清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找 broker 的信息。不过建议至少要提供两个 broker 的信息作为容错。
  • group.id:consumer group 是 kafka 提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的 ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。
  • auto.offset.reset:这个参数是针对新的 groupid 中的消费者而言的,当有新 groupid 的消费者来消费指定的 topic时,对于该参数的配置,会有不同的语义。
    在这里插入图片描述
    配置示例:
std::string errorStr;
RdKafka::Conf::ConfResult errorCode;
m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);

m_event_cb = new ConsumerEventCb;
errorCode = m_config->set("event_cb", m_event_cb, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
    std::cout << "Conf set failed: " << errorStr << std::endl;
}

m_rebalance_cb = new ConsumerRebalanceCb;
errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
    std::cout << "Conf set failed: " << errorStr << std::endl;
}

errorCode = m_config->set("enable.partition.eof", "false", errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
    std::cout << "Conf set failed: " << errorStr << std::endl;
}

errorCode = m_config->set("group.id", m_groupID, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
    std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
    std::cout << "Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr);
if(errorCode != RdKafka::Conf::CONF_OK)
{
    std::cout << "Conf set failed: " << errorStr << std::endl;
}

3.2 订阅主题和分区

订阅主题,可以订阅多个。

ErrorCode subscribe (const std::vectorstd::string &topics);

也可以通过正则表达式方式一次订阅多个主题,比如 “topic-.*”, 则前缀为“topic-.”的主题都被订阅。

m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
// 获取最新的消息数据
errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr);
if (errorCode != RdKafka::Conf::CONF_OK) {
std::cout << "Topic Conf set failed: " << errorStr << std::endl;
}
errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr);
if (errorCode != RdKafka::Conf::CONF_OK) {
std::cout << "Conf set failed: " << errorStr << std::endl;
}
m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr);
if (m_consumer == NULL) {
std::cout << "Create KafkaConsumer failed: " << errorStr << std::endl;
}
std::cout << "Created consumer " << m_consumer->name() << std::endl;
// 订阅主题, 可以订阅多个主题
RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector);

3.3 消息消费

void msg_consume(RdKafka::Message* msg, void* opaque)
{
    switch (msg->err())
    {
    case RdKafka::ERR__TIMED_OUT:
        std::cerr << "Consumer error: " << msg->errstr() << std::endl;  // 超时
        break;
    case RdKafka::ERR_NO_ERROR:     // 有消息进来
        std::cout << " Message in-> topic:" << msg->topic_name() << "partition:["
                  << msg->partition() << "] at offset " << msg->offset()
                  << " key: " << msg->key() << " payload: "
                  << (char*)msg->payload() << std::endl;
        break;
    default:
        std::cerr << "Consumer error: " << msg->errstr() << std::endl;
        break;
    }
}

void KafkaConsumer::pullMessage()
{
    // 订阅Topic
    RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector);
    if (errorCode != RdKafka::ERR_NO_ERROR)
    {
        std::cout << "subscribe failed: " << RdKafka::err2str(errorCode) << std::endl;
    }
    // 消费消息
    while(true)
    {
        RdKafka::Message *msg = m_consumer->consume(1000);
        msg_consume(msg, NULL);
        delete msg;
    }
}

3.4 完整示例代码

KafkaConsumer.h

#ifndef KAFKACONSUMER_H
#define KAFKACONSUMER_H

#pragma once

#include <string>
#include <iostream>
#include <vector>
#include <stdio.h>
#include "rdkafkacpp.h"

class KafkaConsumer
{
public:/**
     * @brief KafkaConsumer
     * @param brokers
     * @param groupID
     * @param topics
     * @param partition
     */
    explicit KafkaConsumer(const std::string& brokers, const std::string& groupID,
                           const std::vector<std::string>& topics, int partition);
    void pullMessage();
    ~KafkaConsumer();
protected:
    std::string m_brokers;
    std::string m_groupID;
    std::vector<std::string> m_topicVector;
    int m_partition;
    RdKafka::Conf* m_config;
    RdKafka::Conf* m_topicConfig;
    RdKafka::KafkaConsumer* m_consumer;
    RdKafka::EventCb* m_event_cb;
    RdKafka::RebalanceCb* m_rebalance_cb;
};

#endif // KAFKACONSUMER_H

KafkaConsumer.cpp

#include "KafkaConsumer.h"

class ConsumerEventCb : public RdKafka::EventCb
{
public:
    void event_cb (RdKafka::Event &event)
    {
        switch (event.type())
        {
        case RdKafka::Event::EVENT_ERROR:
            if (event.fatal())
            {
                std::cerr << "FATAL ";
            }
            std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
                      event.str() << std::endl;
            break;

        case RdKafka::Event::EVENT_STATS:
            std::cerr << "\"STATS\": " << event.str() << std::endl;
            break;

        case RdKafka::Event::EVENT_LOG:
            fprintf(stderr, "LOG-%i-%s: %s\n",
                    event.severity(), event.fac().c_str(), event.str().c_str());
            break;

        case RdKafka::Event::EVENT_THROTTLE:
            std::cerr << "THROTTLED: " << event.throttle_time() << "ms by " <<
                      event.broker_name() << " id " << (int)event.broker_id() << std::endl;
            break;

        default:
            std::cerr << "EVENT " << event.type() <<
                      " (" << RdKafka::err2str(event.err()) << "): " <<
                      event.str() << std::endl;
            break;
        }
    }
};

class ConsumerRebalanceCb : public RdKafka::RebalanceCb
{
private:
    static void printTopicPartition (const std::vector<RdKafka::TopicPartition*>&partitions)        // 打印当前获取的分区
    {
        for (unsigned int i = 0 ; i < partitions.size() ; i++)
            std::cerr << partitions[i]->topic() <<
                      "[" << partitions[i]->partition() << "], ";
        std::cerr << "\n";
    }

public:
    void rebalance_cb (RdKafka::KafkaConsumer *consumer,
                       RdKafka::ErrorCode err,
                       std::vector<RdKafka::TopicPartition*> &partitions)
    {
        std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": ";
        printTopicPartition(partitions);
        if (err == RdKafka::ERR__ASSIGN_PARTITIONS)
        {
            consumer->assign(partitions);
            partition_count = (int)partitions.size();
        }
        else
        {
            consumer->unassign();
            partition_count = 0;
        }
    }
private:
    int partition_count;
};

KafkaConsumer::KafkaConsumer(const std::string& brokers, const std::string& groupID,
                             const std::vector<std::string>& topics, int partition)
{
    m_brokers = brokers;
    m_groupID = groupID;
    m_topicVector = topics;
    m_partition = partition;

    std::string errorStr;
    RdKafka::Conf::ConfResult errorCode;
    m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);

    m_event_cb = new ConsumerEventCb;
    errorCode = m_config->set("event_cb", m_event_cb, errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }

    m_rebalance_cb = new ConsumerRebalanceCb;
    errorCode = m_config->set("rebalance_cb", m_rebalance_cb, errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }

    errorCode = m_config->set("enable.partition.eof", "false", errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }

    errorCode = m_config->set("group.id", m_groupID, errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }
    errorCode = m_config->set("bootstrap.servers", m_brokers, errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }
    errorCode = m_config->set("max.partition.fetch.bytes", "1024000", errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }

    // partition.assignment.strategy  range,roundrobin

    m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    // 获取最新的消息数据
    errorCode = m_topicConfig->set("auto.offset.reset", "latest", errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Topic Conf set failed: " << errorStr << std::endl;
    }
    errorCode = m_config->set("default_topic_conf", m_topicConfig, errorStr);
    if(errorCode != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set failed: " << errorStr << std::endl;
    }
    m_consumer = RdKafka::KafkaConsumer::create(m_config, errorStr);
    if(m_consumer == NULL)
    {
        std::cout << "Create KafkaConsumer failed: " << errorStr << std::endl;
    }
    std::cout << "Created consumer " << m_consumer->name() << std::endl;
}

void msg_consume(RdKafka::Message* msg, void* opaque)
{
    switch (msg->err())
    {
    case RdKafka::ERR__TIMED_OUT:
        std::cerr << "Consumer error: " << msg->errstr() << std::endl;  // 超时
        break;
    case RdKafka::ERR_NO_ERROR:     // 有消息进来
        std::cout << " Message in-> topic:" << msg->topic_name() << "partition:["
                  << msg->partition() << "] at offset " << msg->offset()
                  << " key: " << msg->key() << " payload: "
                  << (char*)msg->payload() << std::endl;
        break;
    default:
        std::cerr << "Consumer error: " << msg->errstr() << std::endl;
        break;
    }
}

void KafkaConsumer::pullMessage()
{
    // 订阅Topic
    RdKafka::ErrorCode errorCode = m_consumer->subscribe(m_topicVector);
    if (errorCode != RdKafka::ERR_NO_ERROR)
    {
        std::cout << "subscribe failed: " << RdKafka::err2str(errorCode) << std::endl;
    }
    // 消费消息
    while(true)
    {
        RdKafka::Message *msg = m_consumer->consume(1000);
        msg_consume(msg, NULL);
        delete msg;
    }
}

KafkaConsumer::~KafkaConsumer()
{
    m_consumer->close();
    delete m_config;
    delete m_topicConfig;
    delete m_consumer;
    delete m_event_cb;
    delete m_rebalance_cb;

}

main.cpp使用:

#include "KafkaConsumer.h"

int main(int argc, char* argv[])
{
    std::string brokers = "127.0.0.1:9092";
    std::vector<std::string> topics;
    topics.push_back("test");
    // topics.push_back("test2");
    std::string group = "testGroup";
    if(argc >= 2) {
        group = argv[1];
    }
    std::cout << "group " << group << std::endl;

    KafkaConsumer consumer(brokers, group, topics, RdKafka::Topic::OFFSET_BEGINNING);
    consumer.pullMessage();

    RdKafka::wait_destroyed(5000);
    return 0;
}

CMakeLists.txt

cmake_minimum_required(VERSION 2.8)

project(KafkaConsumer)

set(CMAKE_CXX_STANDARD 11)
set(CMAKE_CXX_COMPILER "g++")
set(CMAKE_CXX_FLAGS "-std=c++11 ${CMAKE_CXX_FLAGS}")
set(CMAKE_INCLUDE_CURRENT_DIR ON)

# Kafka头文件路径
include_directories(/usr/local/include/librdkafka)
# Kafka库路径
link_directories(/usr/local/lib)

aux_source_directory(. SOURCE)

#add_executable(${PROJECT_NAME} ${SOURCE})

#TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka++)

ADD_EXECUTABLE(${PROJECT_NAME} main.cpp KafkaConsumer.cpp) 
TARGET_LINK_LIBRARIES(${PROJECT_NAME} rdkafka++)

编译:

mkdir build
cd build
cmake ..
make

4 位移提交

Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。
提交位移主要是为了表征 Consumer 的消费进度,这样当 Consumer 发生故障重启之后,就能够从Kafka 中读取之前提交的位移值,然后从相应的位移处继续消费,从而避免整个消费过程重来一遍。
从用户的角度来说,位移提交分为自动提交和手动提交;从 Consumer 端的角度来说,位移提交分为同步提交和异步提交。

4.1 自动提交

自动提交默认全部为同步提交。
自动提交相关参数:
在这里插入图片描述

4.2 手动提交

手动提交可以自己选择是同步提交(commitSync)还是异步提交(commitAsync )。commitAsync 不能够替代 commitSync。commitAsync 的问题在于,出现问题时它不会自动重试,因为它是异步操作。

手动提交,需要将 commitSync 和 commitAsync 组合使用才能到达最理想的效果。可以利用 commitSync 的自动重试来规避那些瞬时错误,同时不希望程序总处于阻塞状态,影响 TPS。

同时使用 commitSync() 和 commitAsync():

  • 对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保Consumer 关闭前能够保存正确的位移数据。
  • 将两者结合后,既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性.

4.3 提交API

ErrorCode commitSync();

提交当前分配分区的位移,同步操作,会阻塞直到位移被提交或提交失败。如果注册了RdKafka::OffsetCommitCb回调函数,其会在KafkaConsumer::consume()函数内调用并提交位移。

ErrorCode commitAsync();

异步提交位移

5 消费Rebalance机制

当kafka遇到如下四种情况的时候,kafka会触发Rebalance机制:

  1. 消费组成员发生了变更,比如有新的消费者加入了消费组组或者有消费者宕机。
  2. 消费者无法在指定的时间之内完成消息的消费。
  3. 消费组订阅的Topic发生了变化。
  4. 订阅的Topic的partition发生了变化。

6 其他重要的消费者参数

在 KafkaConsumer 中,除了前面讲的必要的客户端参数,大部分的参数都有合理的默认值,一般我们
也不需要去修改它们。不过了解这些参数可以让我们更好地使用消费者客户端,其中还有一些重要的参
数涉及程序的可用性和性能,如果能够熟练掌握它们,也可以让我们在编写相关的程序时能够更好地进
行性能调优与故障排查。下面挑选一些重要的参数来做细致的讲解。

6.1 fetch.min.bytes

该参数用来配置 Consumer 在一次拉取请求(调用 poll() 方法)中能从 Kafka 中拉取的最小数据量,默
认值为1(B)。Kafka 在收到 Consumer 的拉取请求时,如果返回给 Consumer 的数据量小于这个参
数所配置的值,那么它就需要进行等待,直到数据量满足这个参数的配置大小。可以适当调大这个参数
的值以提高一定的吞吐量,不过也会造成额外的延迟(latency),对于延迟敏感的应用可能就不可取
了。

6.2 fetch.max.bytes

该参数与 fetch.min.bytes 参数对应,它用来配置 Consumer 在一次拉取请求中从Kafka中拉取的最大
数据量,默认值为52428800(B),也就是50MB。
如果这个参数设置的值比任何一条写入 Kafka 中的消息要小,那么会不会造成无法消费呢?很多资料对
此参数的解读认为是无法消费的,比如一条消息的大小为10B,而这个参数的值是1(B),既然此参数
设定的值是一次拉取请求中所能拉取的最大数据量,那么显然1B<10B,所以无法拉取。这个观点是错误
的,该参数设定的不是绝对的最大值,如果在第一个非空分区中拉取的第一条消息大于该值,那么该消
息将仍然返回,以确保消费者继续工作。也就是说,上面问题的答案是可以正常消费。
与此相关的,Kafka 中所能接收的最大消息的大小通过服务端参数 message.max.bytes(对应于主题端
参数 max.message.bytes)来设置。

6.3 fetch.max.wait.ms

这个参数也和 fetch.min.bytes 参数有关,如果 Kafka 仅仅参考 fetch.min.bytes 参数的要求,那么有可
能会一直阻塞等待而无法发送响应给 Consumer,显然这是不合理的。fetch.max.wait.ms 参数用于指
定 Kafka 的等待时间,默认值为500(ms)。如果 Kafka 中没有足够多的消息而满足不了
fetch.min.bytes 参数的要求,那么最终会等待500ms。这个参数的设定和 Consumer 与 Kafka 之间的
延迟也有关系,如果业务应用对延迟敏感,那么可以适当调小这个参数。

6.4 max.partition.fetch.bytes

这个参数用来配置从每个分区里返回给 Consumer 的最大数据量,默认值为1048576(B),即1MB。
这个参数与 fetch.max.bytes 参数相似,只不过前者用来限制一次拉取中每个分区的消息大小,而后者
用来限制一次拉取中整体消息的大小。同样,如果这个参数设定的值比消息的大小要小,那么也不会造
成无法消费,Kafka 为了保持消费逻辑的正常运转不会对此做强硬的限制。

6.5 max.poll.records

这个参数用来配置 Consumer 在一次拉取请求中拉取的最大消息数,默认值为500(条)。如果消息的
大小都比较小,则可以适当调大这个参数值来提升一定的消费速度。
如果用户的消息处理逻辑很轻量,默认的 500 条消息通常不能满足实际的消息处理速度 。

6.6 connections.max.idle.ms

这个参数用来指定在多久之后关闭闲置的连接,默认值是540000(ms),即9分钟。

6.7 exclude.internal.topics

Kafka 中有两个内部的主题: consumer_offsets 和 transaction_state。exclude.internal.topics 用来
指定 Kafka 中的内部主题是否可以向消费者公开,默认值为 true。如果设置为 true,那么只能使用
subscribe(Collection)的方式而不能使用 subscribe(Pattern)的方式来订阅内部主题,设置为 false 则没
有这个限制。

6.8 receive.buffer.bytes

同生产者。

6.9 send.buffer.bytes

同生产者。

6.10 request.timeout.ms

这个参数用来配置 Consumer 等待请求响应的最长时间,默认值为30000(ms)。

6.11 metadata.max.age.ms

这个参数用来配置元数据的过期时间,默认值为300000(ms),即5分钟。如果元数据在此参数所限定
的时间范围内没有进行更新,则会被强制更新,即使没有任何分区变化或有新的 broker 加入。

6.12 reconnect.backoff.ms

这个参数用来配置尝试重新连接指定主机之前的等待时间(也称为退避时间),避免频繁地连接主机,
默认值为50(ms)。这种机制适用于消费者向 broker 发送的所有请求。

6.13 retry.backoff.ms

这个参数用来配置尝试重新发送失败的请求到指定的主题分区之前的等待(退避)时间,避免在某些故
障情况下频繁地重复发送,默认值为100(ms)。

6.14 isolation.level

这个参数用来配置消费者的事务隔离级别。字符串类型,有效值为“read_uncommitted”和
“read_committed”,表示消费者所消费到的位置,如果设置为“read_committed”,那么消费者就会忽
略事务未提交的消息,即只能消费到LSO(LastStableOffset)的位置,默认情况下为
“read_uncommitted”,即可以消费到 HW(High Watermark)处的位置。

6.15 session.timeout.ms

非常重要的参数之一 !
session.timeout.ms 是 consumer group 检测组内成员发送崩溃的时间 。
假设你设置该参数为 5 分钟,那么当某个 group 成员突然崩攒了(比如被 kill -9 或岩机), 管理
group 的 Kafka 组件(即消费者组协调者,也称 group coordinator)有可能需要 5 分钟才能感知到这
个崩溃。显然我们想要缩短这个时间,让coordinator 能够更快地检测到 consumer 失败 。
这个参数还有另外一重含义 :consumer 消息处理逻辑的最大时间
倘若 consumer 两次 poll 之间的间隔超过了该参数所设置的阑值,那么 coordinator 就会认为这个
consumer 己经追不上组内其他成员的消费进度了,因此会将该 consumer 实例“踢出”组,该
consumer 负责的分区也会被分配给其他 consumer。
在最好的情况下,这会导致不必要的 rebalance,因为 consumer 需要重新加入 group 。更糟的是,对
于那些在被踢出 group 后处理的消息, consumer 都无法提交位移一一这就意味着这些消息在
rebalance 之后会被重新消费一遍。如果一条消息或一组消息总是需要花费很长的时间处理,那么
consumer 甚至无法执行任何消费,除非用户重新调整参数 。鉴于以上的“窘境”, Kafka 社区于 0
.10.1.0 版本对该参数的含义进行了拆分 。 在该版本及以后的版本中, session.timeout.ms 参数被明确
为“ coordinator 检测失败的时间” 。
因此在实际使用中,用户可以为该参数设置一个比较小的值,让 coordinator 能够更快地检测
consumer 崩溃的情况,从而更快地开启 rebalance,避免造成更大的消费滞后( consumer lag ) 。
目前该参数的默认值是 10 秒。

6.16 max.poll.interval.ms

Apache官网max.poll.interval.ms上的解释如下,消费者组中的一员在拉取消息时如果超过了设置的最
大拉取时间,则会认为消费者消费消息失败,kafka会重新进行重新负载均衡,以便把消息分配给另一个
消费组成员。
在一个典型的 consumer 使用场景中,用户对于消息的处理可能需要花费很长时间。这个参数就是用于
设置消息处理逻辑的
最大时间的 。 假设用户的业务场景中消息处理逻辑是把消息、“落地”到远程数据库中,且这个过程平均
处理时间是 2 分钟,那么用户仅需要将 max.poll.interval.ms 设置为稍稍大于 2 分钟的值即可,而不必
为 session. neout.ms 也设置这么大的值。
通过将该参数设置成实际的逻辑处理时间再结合较低的 session.timeout.ms 参数值,consumer group
既实现了快速的 consumer 崩溃检测,也保证了复杂的事件处理逻辑不会造成不必要的 rebalance 。

6.17 heartbeat.interval.ms

该参数和 request.timeout.ms 、max.poll.interval.ms 参数是最难理解的 consumer 参数 。 前面己经
讨论了后两个参数的含义,这里解析一下heartbeat.interval.ms的含义及用法 。
从表面上看,该参数似乎是心跳的问隔时间,但既然己经有了上面的 session.timeout.ms 用于设置超
时,为何还要引入这个参数呢?
这里的关键在于要搞清楚 consumer group 的其他成员,如何得知要开启新一轮 rebalance;当
coordinator 决定开启新一轮 rebalance 时,它会将这个决定以 REBALANCE_IN_PROGRESS 异常的形
式“塞进” consumer 心跳请求的 response 中,这样其他成员拿到 response 后才能知道它需要重新加入
group。显然这个过程越快越好,而heartbeat. interval.ms 就是用来做这件事情的 。
比较推荐的做法是设置一个比较低的值,让 group 下的其他 consumer 成员能够更快地感知新一轮
rebalance. 开启了。
注意,该值必须小于 session.timeout.ms !这很容易理解,毕竟如果 consumer 在
session.timeout.ms 这段时间内都不发送心跳, coordinator 就会认为它已经 dead,因此也就没有必
要让它知晓 coordinator 的决定了。

7 broker参数详解

broker 端参数需要在 Kafka 目录下的 config/server.properties 文件中进行设置。当前对于绝大多数的
broker 端参数而言, Kafka 尚不支持动态修改一一这就是说,如果要新增、修改,抑或是删除某些
broker 参数的话,需要重启对应的 broker 服务器。

7.1 broker.id

Kafka 使用唯一的一 个整数来标识每个 broker ,这就是 broker.id ;该参数默认是-1 。如果不指定,
Kafka 会自动生成一个唯一值;总之,不管用户指定什么都必须保证该值在 Kafka 集群中是唯一的,不
能与其他 broker 冲突 。
在实际使用中,推荐使用从0开始的数字序列,如0、1、2……

7.2 log.dirs

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

该参数指定了 Kafka 持久化消息的目录;非常重要的参数!
若不设置该参数, Kafka 默认使用tmp/kafka-logs 作为消息保存的目录(商用一定要自己设置目录)。

把消息保存在 tmp 录下,生产环境中是不可取的。若待保存的消息数量非常多,那么最好确保该文件夹
下有充足的磁盘空间。

该参数可以设置多个目录,以逗号分隔,比如**/home/kafka1,/home/kafka2** 。在实际使用过程中,
指定多个目录的做法通常是被推荐的,因为这样 Kafka 可以把负载均匀地分配到多个目录下。

若用户机器上有N块物理硬盘,那么设置 个目录(须挂载在不同磁盘上的目录)是一个很好的选择。N
个磁头可以同时执行写操作,极大地提升了吞吐量。
注意 ,这里的“均匀”是根据目录下的分区数进行比较的,而不是根据实际的磁盘空间。

7.3 zookeeper.connect

同样是非常重要的参数。主要是在zookeeper中,kafka的配置数据,有一个公共开始的跟节点;该参数
没有默认的值,如果不配置,则使用zookeeper的/目录;
例如:zkl :218 l,zk2:2181,zk3:2181/kafka_clusterl;/kafka_cluster就是kafka的配置的目录;配置了,
可以起到很好的隔离效果。这样管理 Kafka 集群将变得更加容易。

7.4 listeners

broker 监听器的 csv (comma-separated values)列表,格式是[协议://:主机名:端口],[协议]:[// 主机
名:端口]。
该参数主要用于客户端连接 broker 使用,可以认为是 broker 端开放给 clients的监听端口 如果不指定
主机名,则表示绑定默认网卡:如果0.0.0.0,则表示绑定所有网卡。 Kafka 当前支持的协议类型包括
PLAINTEXT、SSL以及 SASL SSL 等。

7.5 advertised.listeners

跟 listeners 类似,该参数也是用于发布给 clients 的监昕器;不过该参数主要用于 IaaS 环境,比如云上
的机器通常都配有多块网卡(私网网卡和公网网卡)。
对于这种机器,用户可以设置该参数绑定公网 IP 供外部 clients 使用,然后配置上面的 listeners 来绑定
私网 IP供broker间通信使用。
当然不设置该参数也是可以的,只是云上的机器很容易出现 clients 无法获取数据的问题,原因就是
listeners 绑定的是默认网卡,而默认网卡通常都是绑定私网的。
在实际使用场景中,对于配有多块网卡的机器而言,这个参数通常都是需要配置的。

7.6 unclean.leader.election.enable

是否开启 unclean leader 选举。Kafka 社区在1.0.0 版本才正式将该参数默认值调整为 false ;
即表明如果发生这种情况, Kafka 不允许从剩下存活的非 ISR 副本中选择一个当 leader。因为如果
true,这样做固然可以让 Kafka 继续提供服务给 clients ,但会造成消息数据的丢失,正式环境中,数据
不丢失是基本的业务需求;

番外:ISR解释

ISR 全称是in-sync replica ,翻译过来就是与 leader replica 保持同步的 replica 集合 ;一个partition 可
以配置N个replica ,那么这是否就意味着该 partition可以容忍 N-1 replica 失效而不丢失数据呢?答案
是“否”!
Kafka partition 动态维护 一个replica 集合。该集合中的所有 replica 保存的消息日志都与leader
replica 保持同步状态。只有这个集合中的 replica 才能被选举为 leader ,也只有该集合中所有 replica
都接收到了同一条消息, Kafka 才会将该消息置于“己提交”状态,即认为这条消息发送成功。
Kafka 中只要这个集合中至少存在一个 replica ,那些“己提交”状态的消息就不会丢失;
这句话的两个关键点:

  1. ISR 中至少存在一个“活着的”replica;
  2. replica “己提 ”消息 。

Kafka 对于没有提交成功的消息不做任何保证,只保证在 ISR 存活的情况下“己提交”的消息不会丢失。正
常情况下,partition 的所有 replica (含 leader replica )都应该与 leader replica 保持同步,即所有
replica 都在 ISR 中。
因为各种各样的原因,一小部分 replica 开始落后于 leader replica的进度 。当滞后 一定程度时, Kafka
会将这些 replica “踢”出 ISR;相反地,当这replica 重新追上leader进度时候,kafka会再次把它们加回
ISR中。

7.7 delete.topic.enable

是否允许 Kafka 删除 topic 。
默认情况下, Kafka 集群允许用户删除topic 及其数据。这样当用户发起删除 topic 操作时, broker
端会执行 topic 删除逻辑。
在实际生产环境中我们发现允许 Kafka 删除 opic 其实是一个很方便的功能,再加上自Kafka 0.0 新增的
ACL 权限特性,以往对于误操作和恶意操作的担心完全消失了,因此设置该参数为 true 是推荐的做法。

7.8 log.retention. {hours|minutes|ms }

这组参数控制了消息数据的留存时间;默认的留存时间是7天(168小时);即 Kafka 只会保存最近 7天的
数据,井自动删除 7天前的数据。
当前较新版本的Kafka 会根据消息的时间戳信息进行留存与否的判断;老版本消息格式没有时间戳,
Kafka 会根据日志文件的最近修改时间( last modified time )进行判断。
三个参数如果同时设置,优先选取ms的设置,minutes次之,hours最后。

7.9 log.retention.bytes

这个参数定义了空间维度上的留存策略;参数默认值是-1,表示 Kafka 永远不会根据消息日志文件总大
小来删除日志。
对于大小超过该参数的分区日志而言, Kafka 会自动清理该分区的过期日志段文件。

7.10 min.insync.replicas

该参数表示kafka存储的最小副本数,producer发送数据的时候,指定了 broker 端必须成功响应
clients 消息发送的最少副本数;
该参数其实是与 producer 端的 acks 参数配合使用的 ;并且min.insync.replicas 也只有在 acks=-1 时
才有意义;acks=-1 表示 producer端寻求最高等级的持久化保证;
假如 broker 端无法满足该条件,则 clients 的消息发送并不会被视为成功。它与 acks 配合使用可以令
Kafka集群达成最高等级的消息持久化;
举 个例子,假设某个topic 的每个分区的副本数是3 ,那么推荐设置该参数为 2,这样我们就能够容忍
一台broker 宕机而不影响服务;若设置参数为3 ,那么只要任何一台 broker 岩机,整个Kafka 集群将
无法继续提供服务。

7.11 num.network threads (fastdfs 网络数据读取)

一个 broker 在后台用于处理网络请求的线程数,默认是3 。
broker启动时会创建多个线程处理来自其他broker和clients 发送过来的各种请求。会将接收到的请求转
发到后面的处理线程中。在真实的环境中,用户需要不断地监控NetworkProcessorAvgldlePercent JMX
指标; 如果该指标持续低于0.3 ;建议适当增加该参数的值。

7.12 num.io.threads (处理我们读取到网络数据)

这个参数就是控制 broker 端实际处理网络请求的线程数,默认值是8;
Kafka broker 默认创建 8个线程以轮询方式不停地监昕转发过来的网络请求井进行实时处理。 Kafka 同
样也为请求处理提供了一个 JMX 监控指标 RequestHandler A vgldlePercent。如果发现该指标持续低于
0.3 ,则可以考虑适当增加该参数的值。

7.13 message.max. bytes

Kafka broker 能够接收的最大消息大小,默认是 977kB;还不到1MB ,可见是非常小的。
在实际使用场景中,突破 1MB 大小的消息十分常见,因此用户有必要综合考虑 Kafka 集群可能处理的
最大消息尺寸井设置该参数值

7.14 log.segment.bytes (分片的概念)

(默认: 1GB) – kafka数据文件的大小,确保这个数值大于一个消息的长度。一般说来使用默认值即可(一
般一个消息很难大于1G,因为这是一个消息系统,而不是文件系统)。

# The maximum size of a log segment file. When this size is reached a new log
segment will be created.
#log.segment.bytes=1073741824
# 配置成5M做测试
log.segment.bytes==5242880

8 总结

RdKafka提供了两种消费者API,低级API的Consumer和高级API的KafkaConsumer。
Kafka Consumer使用流程:
(1)创建Kafka配置实例。
(2)创建Topic配置实例。
(3)设置Kafka配置实例Broker属性。
(4)设置Topic配置实例属性。
(5)注册回调函数。
(6)创建Kafka Consumer客户端实例。
(7)创建Topic实例。
(8)订阅主题。
(9)消费消息。
(10)关闭消费者实例。
(11)销毁释放RdKafka资源。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/226327.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

springboot监听器模式源码精讲

1.前言 很多时候我们看源码的时候看不下去&#xff0c;其中一个原因是系统往往使用了许多设计模式&#xff0c;如果你不清楚这些设计模式&#xff0c;这无疑增加了你阅读源码的难度。 springboot中就大量使用了设计模式&#xff0c;本文主要介绍其中的一种监听器模式&#xf…

谈谈 .NET8 平台中对 LiteDB 的 CRUD 操作

哪个啥&#xff01;纯 C# 编写的 LiteDB 你还不会操作&#xff1f; LiteDB 简介LiteDB 安装1、同步版 LiteDB2、异步版 LiteDB.Async LiteDB StudioLiteDB CRUD 操作举例1、.net cli 命令创建项目2、项目添加相关 nuget 包3、改造项目结构4、改造项目代码 LiteDB vs SQLite 对比…

泳道图绘制全攻略,一图胜千言,快速上手

泳道图是一种流程图的形式&#xff0c;通过在不同的泳道中展示不同的参与者&#xff0c;帮助我们更好地理解和分析流程。它是一种非常有用的工具&#xff0c;可以帮助我们在团队协作、流程管理和问题解决等方面取得更好的效果。 1. 泳道图的定义 泳道图是一种以泳道为基础的流程…

postgresql从入门到精通 - 第37讲:postgres物理备份和恢复概述

PostgreSQL从小白到专家&#xff0c;是从入门逐渐能力提升的一个系列教程&#xff0c;内容包括对PG基础的认知、包括安装使用、包括角色权限、包括维护管理、、等内容&#xff0c;希望对热爱PG、学习PG的同学们有帮助&#xff0c;欢迎持续关注CUUG PG技术大讲堂。 第37讲&#…

提高工厂能源效率的关键:工厂能耗监测平台

工业做为能源消耗的重要场所&#xff0c;所以节能减排对工业来讲是一个亟需解决的问题。除了对设备进行更新换代外&#xff0c;还需要能源管理消耗监测平台&#xff0c;帮助企业实现节能减排的目标。 工厂能源消费量非常庞大&#xff0c;能源比较难以监测与控制。传统能源的管…

路径规划之RRT算法

系列文章目录 路径规划之Dijkstra算法 路径规划之Best-First Search算法 路径规划之A *算法 路径规划之D *算法 路径规划之PRM算法 路径规划之RRT算法 路径规划之RRT算法 系列文章目录前言一、RRT算法1.起源2.流程3. 优缺点3.1 优点3.2 缺点 4. 实际效果 前言 PRM方法相比于传…

正则表达式(3):入门

正则表达式&#xff08;3&#xff09;&#xff1a;入门 小结 本博文转载自 从这篇文章开始&#xff0c;我们将介绍怎样在Linux中使用”正则表达式”&#xff0c;如果你想要学习怎样在Linux中使用正则表达式&#xff0c;这些文章就是你所需要的。 在认识”正则表达式”之前&am…

图像处理之把模糊的图片变清晰

1.图片如果是有雾化效果的对图像产生影响的,要先进行图形增强,Retinex是基于深度神经网络了,我在之前图形处理的文章一路从神经网络(概率统计)—>积卷神经网络(对区域进行概率统计,对图片进行切割多个识别对象)–>深度积卷神经网络(RetinexNet也是模拟人脑的处理过程,增加…

挑选分支中某一个提交进行合并

复制提交的哈希(sha-1)值 挑选提交 git cherry-pick 复制过来的哈希值 若有冲突&#xff0c;解决冲突&#xff0c;没有冲突&#xff0c;即合并完成

C语言普里姆(Prim)算法实现计算国家建设高铁运输网最低造价的建设方案

背景&#xff1a; 描述&#xff1a;为促进全球更好互联互通&#xff0c;亚投行拟在一带一路沿线国家建设高铁运输网&#xff0c;请查阅相关资料 画出沿线国家首都或某些代表性城市的连通图&#xff0c;为其设计长度最短或造价最低的高铁建设方案。 要求&#xff1a;抽象出的图…

Linux-进程之间的通信

目录 ​编辑 一.什么是进程之间的通信 二.进程之间的通信所访问的数据 三.进程之间的通信是如何做到的 四.基于内存文件级别的通信方式——管道 1.什么是管道 2.管道的建立过程——匿名管道 a.什么是匿名管道 b.匿名管道特点&#xff1a; c.使用匿名管道的…

Peter算法小课堂—贪心算法

课前思考&#xff1a;贪心是什么&#xff1f;贪心如何“贪”&#xff1f; 课前小视频&#xff1a;什么是贪心算法 - 知乎 (zhihu.com) 贪心 贪心是一种寻找最优解问题的常用方法。 贪心一般将求解过程分拆成若干个步骤&#xff0c;自顶向下&#xff0c;解决问题 太戈编程第…

邮政单号查询,邮政快递物流查询,并进行提前签收分析

批量查询邮政快递单号的物流信息&#xff0c;并将提前签收件分析筛选出来。 所需工具&#xff1a; 一个【快递批量查询高手】软件 邮政快递单号若干 操作步骤&#xff1a; 步骤1&#xff1a;运行【快递批量查询高手】软件&#xff0c;第一次使用的朋友记得先注册&#xff0c…

关于svn如何上传一个完整的项目

注意&#xff1a;请一定要按照该步骤进行操作&#xff0c;请上传新项目时将项目名称进行规范命名 例如原始文件是arrange_v2 将此项目需要注入新的医院 则命名为 arrange_某医院名称_门诊或者医技或者药房_v2 重新命名文件夹名称快捷键 &#xff08;F12&#xff09; 一 &…

【Linux】公网远程访问AMH服务器管理面板

目录 1. Linux 安装AMH 面板2. 本地访问AMH 面板3. Linux安装Cpolar4. 配置AMH面板公网地址5. 远程访问AMH面板6. 固定AMH面板公网地址 AMH 是一款基于 Linux 系统的服务器管理面板&#xff0c;它提供了一系列的功能&#xff0c;包括网站管理、FTP 管理、数据库管理、DNS 管理、…

UI自动化测试工具的定义及重要性

UI自动化测试工具在现代软件开发中起着不可或缺的作用。它们能够提高测试效率、减少人为错误、提供全面的测试覆盖&#xff0c;并支持持续集成。通过有效使用UI自动化测试工具&#xff0c;开发团队可以提高软件质量&#xff0c;提供更可靠的应用程序&#xff0c;满足用户的需求…

Jsoup爬取HTTPS页面数据资源,并导入数据库(Java)

一、实现思路 示例页面&#xff1a; 2020年12月中华人民共和国县以上行政区划代码 忽略https请求的SSL证书通过Jsoup获取页面标签遍历行标签&#xff0c;分别获取每个行标签的第二个和第三个列标签将获取到的行政代码和单位名称分别插入sql语句占位符执行sql语句&#xff0c…

掌汇云 | 全场景数据追踪,多维了解用户偏好,提高运营效率

掌汇云拥有黄金“三件套”&#xff1a;掌头条、汇互动、云品牌。群硕借助这些功能套件&#xff0c;面向细分领域如&#xff1a;会展&#xff0c;食品饮料、医药以及工业等&#xff0c;定制综合性信息服务平台&#xff0c;提供资讯、商机、企业人脉、上下游资源、活动等高质量服…

<软考>软件设计师-3程序设计语言基础(总结)

(一) 程序设计语言概述 1 程序设计语言的基本概念 1-1 程序设计语言的目的 程序设计语言是为了书写计算机程序而人为设计的符号语言&#xff0c;用于对计算过程进行描述、组织和推导。 1-2 程序语言分类 低级语言 : 机器语言&#xff08;计算机硬件只能识别0和1的指令序列)&…

docker网络【重点】

一、网络知识 1、桥接模式&#xff1a;用于链接两个不同网络段的设备&#xff0c;是共享通信的一种方式 2、桥接设备&#xff1a;工作在OSI模型的第二层&#xff08;数据链路层&#xff09;。根据MAC地址转发数据帧&#xff0c;类似于交换机&#xff0c;只能转发同一网段&…