librdkafka的简单使用

文章目录

    • 摘要
    • kafka是什么
    • 安装环境
    • librdkafka的简单使用
      • 生产者
      • 消费者

摘要

本文是Getting Started with Apache Kafka and C/C++的中文版, kafka的hello world程序。

本文完整代码见仓库,这里只列出producer/consumer的代码


kafka是什么

本节来源:Kafka - 维基百科,自由的百科全书、Kafka入门简介 - 知乎

首先我们得知道什么是Kafka。

Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理消息的机制。

在这里插入图片描述

kafka有以下一些基本概念:

  • Producer - 消息生产者,就是向kafka broker发消息的客户端。
  • Consumer - 消息消费者,是消息的使用方,负责消费Kafka服务器上的消息。
  • Topic - 主题,由用户定义并配置在Kafka服务器,用于建立Producer和Consumer之间的订阅关系。生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息。
  • Partition - 消息分区,一个topic可以分为多个 partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
  • Broker - 一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
  • Consumer Group - 消费者分组,用于归组同类消费者。每个consumer属于一个特定的consumer group,多个消费者可以共同消费一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。
  • Offset - 消息在partition中的偏移量。每一条消息在partition都有唯一的偏移量,消费者可以指定偏移量来指定要消费的消息。

安装环境

上一节,kafka的概念看着比较简单,发布-订阅/生产-消费的模型。

为了可以调用Kafka的C/C++ API, 需要先安装环境。

# almlinux8
# dnf search kafka
# dnf install librdkafka-devel

# dnf search glib
# dnf install glib2-devel

# ubuntu22
# 开发库apt install librdkafka-dev  libglib2.0-dev

# 安装docker环境apt install docker.io docker-compose

# 本地安装 Kafka
## ref: https://docs.confluent.io/confluent-cli/current/install.html#cpwget -qO - https://packages.confluent.io/confluent-cli/deb/archive.key | sudo apt-key add
➜ add-apt-repository "deb https://packages.confluent.io/confluent-cli/deb stable main"apt install confluent-cli

## 启动kafka
## usage: https://docs.confluent.io/confluent-cli/current/command-reference/local/kafka/confluent_local_kafka_start.html
## error: https://stackoverflow.com/questions/63776518/error-2-matches-found-based-on-name-network-nameofservice-default-is-ambiguo
## error:https://stackoverflow.com/questions/77985757/kafka-in-docker-using-confluent-cli-doesnt-workwhereis confluent
confluent: /usr/bin/confluent

➜ export CONFLUENT_HOME=/usr/bin/confluent

# 我执行下面命令后,没有看到Plaintext Ports信息
➜ confluent local kafka start

# 停止,然后重新启动,管用了
➜ confluent local kafka stop
➜ confluent local kafka start

The local commands are intended for a single-node development environment only, NOT for production usage. See more: https://docs.confluent.io/current/cli/index.html


Pulling from confluentinc/confluent-local
Digest: sha256:ad62269bf4766820c298f7581cf872a49f46a11dbaebcccb4fd2e71049288c5b
Status: Image is up to date for confluentinc/confluent-local:7.6.0
+-----------------+-------+
| Kafka REST Port | 8082  |
| Plaintext Ports | 43465 |
+-----------------+-------+
Started Confluent Local containers "8d72d911a4".
To continue your Confluent Local experience, run `confluent local kafka topic create <topic>` and `confluent local kafka topic produce <topic>`.

# Create a new topic, purchases, which you will use to produce and consume events.
➜ confluent local kafka topic create purchases
Created topic "purchases".

librdkafka的简单使用

confluenceinc/librdkafka是Apache Kafka协议的 C 库实现 ,提供生产者、消费者和管理客户端。

下面运行的程序来自:Apache Kafka and C/C++ - Getting Started Tutorial

代码中kafka的API可以查询:librdkafka: librdkafka documentation

代码中使用了glib库,日常开发我不会使用这个库,因为感觉比较冷,它的API可查询:GLib – 2.0: Automatic Cleanup


生产者

总体逻辑:

  • 从配置文件中加载配置
  • 创建生产者
  • 生产者发送消息
#include <glib.h>
#include <librdkafka/rdkafka.h>

#include "common.c"

#define ARR_SIZE(arr) ( sizeof((arr)) / sizeof((arr[0])) )

/* Optional per-message delivery callback (triggered by poll() or flush())
 * when a message has been successfully delivered or permanently
 * failed delivery (after retries).
 */
static void dr_msg_cb (rd_kafka_t *kafka_handle,
                       const rd_kafka_message_t *rkmessage,
                       void *opaque) {
    if (rkmessage->err) {
        g_error("Message delivery failed: %s", rd_kafka_err2str(rkmessage->err));
    }
}

int main (int argc, char **argv) {
    rd_kafka_t *producer;
    rd_kafka_conf_t *conf;
    char errstr[512];

    // Parse the command line.
    if (argc != 2) {
        g_error("Usage: %s <config.ini>", argv[0]);
        return 1;
    }

    // Parse the configuration.
    // See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    const char *config_file = argv[1];

    g_autoptr(GError) error = NULL;
    g_autoptr(GKeyFile) key_file = g_key_file_new();
    if (!g_key_file_load_from_file (key_file, config_file, G_KEY_FILE_NONE, &error)) {
        g_error ("Error loading config file: %s", error->message);
        return 1;
    }

    // Load the relevant configuration sections.
    conf = rd_kafka_conf_new();
    load_config_group(conf, key_file, "default");

    // Install a delivery-error callback.
    rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);

    // Create the Producer instance.
    producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
    if (!producer) {
        g_error("Failed to create new producer: %s", errstr);
        return 1;
    }

    // Configuration object is now owned, and freed, by the rd_kafka_t instance.
    conf = NULL;

    // Produce data by selecting random values from these lists.
    int message_count = 10;
    const char *topic = "purchases";
    const char *user_ids[6] = {"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"};
    const char *products[5] = {"book", "alarm clock", "t-shirts", "gift card", "batteries"};

    for (int i = 0; i < message_count; i++) {
        const char *key =  user_ids[random() % ARR_SIZE(user_ids)];
        const char *value =  products[random() % ARR_SIZE(products)];
        size_t key_len = strlen(key);
        size_t value_len = strlen(value);

        rd_kafka_resp_err_t err;

        err = rd_kafka_producev(producer,
                                RD_KAFKA_V_TOPIC(topic),
                                RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
                                RD_KAFKA_V_KEY((void*)key, key_len),
                                RD_KAFKA_V_VALUE((void*)value, value_len),
                                RD_KAFKA_V_OPAQUE(NULL),
                                RD_KAFKA_V_END);

        if (err) {
            g_error("Failed to produce to topic %s: %s", topic, rd_kafka_err2str(err));
            return 1;
        } else {
            g_message("Produced event to topic %s: key = %12s value = %12s", topic, key, value);
        }

        rd_kafka_poll(producer, 0);
    }

    // Block until the messages are all sent.
    g_message("Flushing final messages..");
    rd_kafka_flush(producer, 10 * 1000);

    if (rd_kafka_outq_len(producer) > 0) {
        g_error("%d message(s) were not delivered", rd_kafka_outq_len(producer));
    }

    g_message("%d events were produced to topic %s.", message_count, topic);

    rd_kafka_destroy(producer);

    return 0;
}

消费者

总体逻辑:

  • 从配置文件中加载配置
  • 创建消费者
  • 订阅topic
  • 轮询消费者的消息
#include <glib.h>
#include <librdkafka/rdkafka.h>

#include "common.c"

static volatile sig_atomic_t run = 1;

/**
 * @brief Signal termination of program
 */
static void stop(int sig) { run = 0; }

int main(int argc, char **argv) {
  rd_kafka_t *consumer;
  rd_kafka_conf_t *conf;
  rd_kafka_resp_err_t err;
  char errstr[512];

  // Parse the command line.
  if (argc != 2) {
    g_error("Usage: %s <config.ini>", argv[0]);
    return 1;
  }

  // Parse the configuration.
  // See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
  const char *config_file = argv[1];

  g_autoptr(GError) error = NULL;
  g_autoptr(GKeyFile) key_file = g_key_file_new();
  if (!g_key_file_load_from_file(key_file, config_file, G_KEY_FILE_NONE,
                                 &error)) {
    g_error("Error loading config file: %s", error->message);
    return 1;
  }

  // Load the relevant configuration sections.
  conf = rd_kafka_conf_new();
  load_config_group(conf, key_file, "default");
  load_config_group(conf, key_file, "consumer");

  // Create the Consumer instance.
  consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
  if (!consumer) {
    g_error("Failed to create new consumer: %s", errstr);
    return 1;
  }
  rd_kafka_poll_set_consumer(consumer);

  // Configuration object is now owned, and freed, by the rd_kafka_t instance.
  conf = NULL;

  // Convert the list of topics to a format suitable for librdkafka.
  const char *topic = "purchases";
  rd_kafka_topic_partition_list_t *subscription =
      rd_kafka_topic_partition_list_new(1);
  rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA);

  // Subscribe to the list of topics.
  err = rd_kafka_subscribe(consumer, subscription);
  if (err) {
    g_error("Failed to subscribe to %d topics: %s", subscription->cnt,
            rd_kafka_err2str(err));
    rd_kafka_topic_partition_list_destroy(subscription);
    rd_kafka_destroy(consumer);
    return 1;
  }

  rd_kafka_topic_partition_list_destroy(subscription);

  // Install a signal handler for clean shutdown.
  signal(SIGINT, stop);

  // Start polling for messages.
  while (run) {
    rd_kafka_message_t *consumer_message;

    consumer_message = rd_kafka_consumer_poll(consumer, 500);
    if (!consumer_message) {
      g_message("Waiting...");
      continue;
    }

    if (consumer_message->err) {
      if (consumer_message->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
        /* We can ignore this error - it just means we've read
         * everything and are waiting for more data.
         */
      } else {
        g_message("Consumer error: %s",
                  rd_kafka_message_errstr(consumer_message));
        return 1;
      }
    } else {
      g_message("Consumed event from topic %s: key = %.*s value = %s",
                rd_kafka_topic_name(consumer_message->rkt),
                (int)consumer_message->key_len, (char *)consumer_message->key,
                (char *)consumer_message->payload);
    }

    // Free the message when we're done.
    rd_kafka_message_destroy(consumer_message);
  }

  // Close the consumer: commit final offsets and leave the group.
  g_message("Closing consumer");
  rd_kafka_consumer_close(consumer);

  // Destroy the consumer.
  rd_kafka_destroy(consumer);

  return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/496992.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C# 高级文件操作与异步编程探索(初步)

文章目录 文本文件的读写探秘StreamReader 类深度剖析StreamWriter 类细节解读编码和中文乱码的解决方案 二进制文件的读写BinaryReader 类全面解析BinaryWriter 类深度探讨 异步编程与C#的未来方向同步与异步&#xff1a;本质解读Task 的神奇所在async/await 的魔法 在现代编程…

NOIP,CSP-J,CSP-S——树

一、树 概念: 节点、深度、路径、边 树的直径 真题: 答案:B 答案:A 一个树的边是n-1 现在是m,所以m-(n-1)=m-n+1

Elasticsearch 向量搜索

目标记录 ["你好&#xff0c;我的爱人","你好&#xff0c;我的爱妻","你好&#xff0c;我的病人","世界真美丽"] 搜索词 爱人 预期返回 ["你好&#xff0c;我的爱人","你好&#xff0c;我的爱妻"] 示例代码…

解决 vue activited 无效问题

当对页面APP.vue组件router-view标签使用了keep-alive之后在组件activated状态时不会发送请求&#xff0c;这时需要使用 keep-alive标签的 exclude属性排除需要重新发送请求的组件。需要注意exclude的值要和组件本身的name值要一致&#xff0c;如果不一致就会不生效。目前我出现…

element-ui checkbox 组件源码分享

简单分享 checkbox 组件&#xff0c;主要从以下三个方面来分享&#xff1a; 1、组件的页面结构 2、组件的属性 3、组件的方法 一、组件的页面结构 二、组件的属性 2.1 value / v-model 属性&#xff0c;绑定的值&#xff0c;类型 string / number / boolean&#xff0c;无…

46.continue语句

目录 一.continue语句 二.视频教程 一.continue语句 continue语句的作用和break语句很像&#xff0c;break语句会跳出当前循环&#xff0c;而continue语句则是跳出本次循环&#xff0c;继续执行下一次循环。 举个例子&#xff1a; #include <stdio.h>void main(void)…

想学网络安全,从哪里开始?网络安全的学习路线

网络安全学习路线&#xff1a; 想学习网络安全专业的知识&#xff0c;想当黑客&#xff0c;但是不知道该从哪里开始学。 我给你一个路线&#xff01; 清晰图片和大纲&#xff1a;https://docs.qq.com/doc/DU1lpVFpSbWVrd2p3

【每周精选资讯 | 第 1 期】2024-03-11 ~ 2024-03-17

前言 大家好&#xff0c;我是翼同学。这里是【每周精选资讯】的第一期内容。 GPT 递给我苹果 Figure展示了与OpenAI合作的最新进展&#xff0c;通过结合先进的神经网络&#xff0c;使机器人能够执行类似人类的快速、灵巧动作。主要功能包括描述周围环境、常识推理、将高层次…

Android ImageView以及实现截图

实现效果 截图前 截图后 代码 package cn.jj.huaweiad;import android.annotation.SuppressLint; import android.graphics.Bitmap; import android.os.Bundle; import android.os.Handler; import android.util.Log; import android.view.View; import android.view.ViewGro…

螺旋矩阵的算法刷题

螺旋矩阵的算法刷题 本文主要涉及螺旋矩阵的算法 包括三个题目分别是 59. 螺旋矩阵 II54. 螺旋矩阵 中等LCR 146. 螺旋遍历二维数组 文章目录 螺旋矩阵的算法刷题一 、螺旋矩阵简单1.1 实现一&#xff08;我认为这个方法更巧妙&#xff01;&#xff01;&#xff09;1.2 实现二&…

Redis入门到实战-第十七弹

Redis实战热身t-digest篇 完整命令参考官网 官网地址 声明: 由于操作系统, 版本更新等原因, 文章所列内容不一定100%复现, 还要以官方信息为准 https://redis.io/Redis概述 Redis是一个开源的&#xff08;采用BSD许可证&#xff09;&#xff0c;用作数据库、缓存、消息代理…

2024最新华为OD机试试题库全 -【二叉树计算】- C卷

1. 🌈题目详情 1.1 ⚠️题目 给出一个二叉树如下图所示: 请由该二叉树生成一个新的二叉树,它满足其树中的每个节点将包含原始树中的左子树和右子树的和。 左子树表示该节点左侧叶子节点为根节点的一颗新树;右子树表示该节点右侧叶子节点为根节点的一颗新树。 1.2 �…

自养号测评,补单的五大关键要素

现在很多大卖都是自己管理几百个账号&#xff0c;交给服务商不是特别靠谱 第一 你不知道服务商账号质量怎么样 第二 账号一天下了多少你也不清楚&#xff0c;如果下了很多单万一封号被关联了怎么办 第三 你也不知道服务商用什么卡给你下单&#xff0c;用一些低汇率和黑卡下单…

运用开关量信号远程传输装置实现工厂智能化技改需要分几步走

DTD509H系列开关量信号无线传输器由一个无线信号发射终端和一个无线信号接收终端组成&#xff0c;也可以根据用户现场实现一点对多点或者多点对一点的无线控制。DTD509H系列开关量信号无线传输器可与PLC的IO端口、继电器、二次仪表、传感器等工业设备配套使用&#xff0c;运用无…

【前端Vue】社交信息头条项目完整笔记第2篇:二、登录注册,准备【附代码文档】

社交媒体-信息头条项目完整开发笔记完整教程&#xff08;附代码资料&#xff09;主要内容讲述&#xff1a;一、项目初始化使用 Vue CLI 创建项目,加入 Git 版本管理,调整初始目录结构,导入图标素材,引入 Vant 组件库,移动端 REM 适配,关于 , 配置文件,封装请求模块。十、用户关…

快速上手Spring Cloud 十四:璀璨物联网之路

快速上手Spring Cloud 一&#xff1a;Spring Cloud 简介 快速上手Spring Cloud 二&#xff1a;核心组件解析 快速上手Spring Cloud 三&#xff1a;API网关深入探索与实战应用 快速上手Spring Cloud 四&#xff1a;微服务治理与安全 快速上手Spring Cloud 五&#xff1a;Spring …

Java项目:78 springboot学生宿舍管理系统的设计与开发

作者主页&#xff1a;舒克日记 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文中获取源码 项目介绍 系统的角色&#xff1a;管理员、宿管、学生 管理员管理宿管员&#xff0c;管理学生&#xff0c;修改密码&#xff0c;维护个人信息。 宿管员管理公寓…

iOS - Runtime-API

文章目录 iOS - Runtime-API1. Runtime应用1.1 字典转模型1.2 替换方法实现1.3 利用关联对象给分类添加属性1.4 利用消息转发机制&#xff0c;解决方法找不到的异常问题 2. Runtime-API2.1 Runtime API01 – 类2.1.1 动态创建一个类&#xff08;参数&#xff1a;父类&#xff0…

轻松赚钱,精彩生活:上班族副业赚钱新攻略大揭秘!

薪水总是捉襟见肘&#xff0c;每月账单总让人倍感压力。你是否曾在静谧的夜晚&#xff0c;躺在床上&#xff0c;思索如何为家庭多赚一分钱&#xff1f;其实&#xff0c;你并不孤单。在这个充满机遇与挑战的时代&#xff0c;越来越多的人开始寻找副业&#xff0c;以期望让生活更…

Appium设备交互API

设备交互API指的是操作设备系统中的一些固有功能&#xff0c;而非被测程序的功能&#xff0c;例如模拟来电&#xff0c;模拟发送短信&#xff0c;设置网络&#xff0c;切换横竖屏&#xff0c;APP操作&#xff0c;打开通知栏&#xff0c;录屏等。 模拟来电 make_gsm_call(phon…