使用 bend-ingest-kafka 将数据流实时导入到 Databend

作者:韩山杰

Databend Cloud 研发工程师

https://github.com/hantmac

Databend是一个开源、高性能、低成本易于扩展的新一代云数据仓库。bend-ingest-kafka 是一个专为 Databend 设计的实时数据导入工具,它允许用户从 Apache Kafka 直接将数据流导入到 Databend 中,实现数据的实时分析和处理。

为什么选择bend-ingest-kafka?

  • 实时性: 能够实时地从 Kafka 中读取数据并导入到 Databend。
  • 高吞吐量: 支持高并发的数据导入,满足大规模数据处理的需求。
  • 易用性: 提供了简单直观的配置方式,便于用户快速上手。
  • 灵活性: 可二次开发支持多种数据格式和自定义转换逻辑。

环境准备

在使用 bend-ingest-kafka 之前,需要确保以下环境已经搭建好:

  • 一个运行中的 Databend 实例或者在 Databend Cloud 中创建一个 warehouse(推荐)。
  • 一个配置好的 Apache Kafka 集群。
  • 已经安装的 bend-ingest-kafka。

快速开始

Step 1: 安装 bend-ingest-kafka

可以从 Databend 的官方 GitHub 仓库 release 页面 下载对应 OS 架构的 bend-ingest-kafka 的可执行二进制文件,或者直接执行命令安装最新版本。

go install  github.com/databendcloud/bend-ingest-kafka@latest

Step 2: 配置 bend-ingest-kafka

配置文件通常包括 Kafka 的连接以及配置信息、Databend 的连接信息以及数据转换的逻辑。以下是一个简单的配置示例:

{
  "kafkaBootstrapServers": "localhost:9092",
  "kafkaTopic": "ingest_test",
  "KafkaConsumerGroup": "test",
  "mockData": "",
  "isJsonTransform": false,
  "databendDSN": "https://cloudapp:password@tn3ftqihs--medium-p8at.gw.aws-us-east-2.default.databend.com:443",
  "databendTable": "default.kfk_test",
  "batchSize": 10,
  "batchMaxInterval": 5,
  "dataFormat": "json",
  "workers": 1,
  "copyPurge": false,
  "copyForce": false,
  "disableVariantCheck": true,
  "minBytes": 1024,
  "maxBytes": 1048576,
  "maxWait": 10,
  "useReplaceMode": false,
  "userStage": "~"
}

具体的配置参数可以参考 Parameter References,这里对几个比较重要的参数展开解释。

  • isJsonTransform: 默认为 true,将 Kafka Json 数据逐字段转换为 Databend 表数据。通过设置 isJsonTransform 为 true 来使用此模式。如果设置为 false 的话,系统将在 Databend 中自动创建一个 raw table, 列包括 (uuid, koffset, kpartition, raw_data, record_metadata, add_time),并将原始数据导入此表。其中 raw_data 为导入的 kafka Json 数据,record_metadata 包含了本条数据的 kafka 元信息 - topicpartitionoffsetcreate_timekey,方便用户查询。
  • useReplaceMode: useReplaceMode 是一种去重模式,开启后如果表中已存在数据,新数据将替换旧数据。但 useReplaceMode 仅在 isJsonTransform 为 false 时支持,因为它需要在目标表中添加 koffset 和 kpartition 字段。在这种模式下,系统可以实现 exactly once 的同步语义,否则为 at-least-once 语义。
  • userStage: 用户的自定义 external stage name。

Step 3: 启动数据导入

这里使用 raw-data 模式作演示。

Kafka 的 Json 数据示例为:
{"i64": 10,"u64": 30,"f64": 20,"s": "hao","s2": "hello","a16":[1],"a8":[2],"d": "2011-03-06","t": "2016-04-04 11:30:00"}
模拟 kafka 生产数据

可以使用下面的脚本快速生成 kafka json 数据:

from confluent_kafka import Producer

# 创建一个Producer实例
p = Producer({'bootstrap.servers': 'localhost:9092'})

for i in range(1000000):
    json_data = '{"i64": 10,"u64": 30,"f64": 20,"s": "hao","s2": "hello","a16":[1],"a8":[2],"d": "2011-03-06","t": "2016-04-04 11:30:00"}'
    p.produce('ingest_test', json_data)
    print(i)
    p.flush()
使用配置文件启动 bend-ingest-kafka

默认读取 ./config/conf.json 配置文件,开始将 Kafka 中的数据导入到 Databend。

./bend-ingest-kafka

启动后可以看到 log 和 metrics:

到 Databend 中可以查询到已经同步的数据:

 由于 raw_data 和 record_metadata 的字段格式都是 JSON ,所以可以很灵活地做一些数据分析:

select record_metadata['partition'] p,
                min(record_metadata['offset']::bigint) o1,
        max(record_metadata['offset']::bigint) o2,
        o2-o1+1 sub_count,
        count(distinct record_metadata['offset']) distinct_cnt,
        count(1) cnt
from default.kfk_test 
group by p
order by p;

高级特性

  • 错误处理: 能够处理数据导入过程中的异常,并提供重试机制。
  • 监控与日志: 提供详细的日志记录和监控指标,方便跟踪数据导入的状态。

结语

bend-ingest-kafka 作为一个强大的工具,为 Databend 用户提供了从 Kafka 实时导入数据的能力。通过本文的介绍,用户应该能够快速上手并利用这个工具来实现实时数据处理的需求。

关于 Databend

Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。
👨‍💻‍ Databend Cloud:https://databend.cn

📖 Databend 文档:https://docs.databend.cn/

💻 Wechat:Databend

✨ GitHub:https://github.com/datafuselabs/databend

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/769985.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MacOS下更新curl

苹果自带的curl不支持Https,我们可以通过curl -V看到如下结果 curl 7.72.0 (x86_64-apple-darwin18.6.0) libcurl/7.72.0 zlib/1.2.12 libidn2/2.3.7 librtmp/2.3 Release-Date: 2020-08-19 Protocols: dict file ftp gopher http imap ldap ldaps pop3 rtmp rtsp smtp telne…

LabVIEW汽车ECU测试系统

开发了一个基于LabVIEW开发的汽车发动机控制单元(ECU)测试系统。该系统使用了NI的硬件和LabVIEW软件,能够自动执行ECU的功能测试和性能测试,确保其在不同工作条件下的可靠性和功能性。通过自动化测试系统,大大提高了测…

基于xilinx FPGA的GTX/GTH/GTY位置信息查看方式(如X0Y0在bank几)

目录 1 概述2 参考文档3 查看方式4查询总结: 1 概述 本文用于介绍如何查看xilinx fpga GTX得位置信息(如X0Y0在哪个BANK/Quad)。 2 参考文档 《ug476_7Series_Transceivers》 《pg156-ultrascale-pcie-gen3-en-us-4.4》 3 查看方式 通过…

linux——IPC 进程间通信

IPC 进程间通信 interprocess communicate IPC(Inter-Process Communication),即进程间通信,其产生的原因主要可以归纳为以下几点: 进程空间的独立性 资源隔离:在现代操作系统中,每个进程都…

Hadoop-12-Hive 基本介绍 下载安装配置 MariaDB安装 3台云服务Hadoop集群 架构图 对比SQL HQL

章节内容 上一节我们完成了: Reduce JOIN 的介绍Reduce JOIN 的具体实现DriverMapperReducer运行测试 背景介绍 这里是三台公网云服务器,每台 2C4G,搭建一个Hadoop的学习环境,供我学习。 之前已经在 VM 虚拟机上搭建过一次&am…

独立开发者系列(18)——js的window对象

独立开发者&#xff0c;必然要面对JS代码&#xff0c;基本可以认为在脚本语言里面&#xff0c;JS门槛最低&#xff0c;正因为如此&#xff0c;JS也是最受欢迎的开发语言之一。JS的代码运行规律&#xff0c;按照代码模块执行&#xff0c;也就是<script></script> 每…

2024年上半年网络工程师下午真题及答案解析

试题一(20分) 某高校网络拓扑如下图所示&#xff0c;两校区核心&#xff08;CORE-1、CORE-2&#xff09;&#xff0c;出口防火墙&#xff08;NGFW-1、NGFW-2&#xff09;通过校区间光缆互联&#xff0c;配置OSPF实现全校路由收敛&#xff0c;两校区相距40km。两校区默认由本地…

「媒体邀约」苏州媒体宣传服务公司

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 媒体宣传加速季&#xff0c;100万补贴享不停&#xff0c;一手媒体资源&#xff0c;全国100城线下落地执行。详情请联系胡老师。 苏州的媒体资源相当丰富&#xff0c;涵盖了报纸、电视、广…

postman请求访问:认证失败,无法访问系统资源

1、使用postman时&#xff0c;没有传入相应的token&#xff0c;就会出现这种情况&#xff0c;此时需要把token放进去 发现问题: { "msg": "请求访问&#xff1a;/getInfo&#xff0c;认证失败&#xff0c;无法访问系统资源", "code": 401 } 1…

金属制品行业企业数字化转型实践

金属制品行业总体上存在着企业数量多、规模小、管理流程复杂等特点。而在数字化应用方面&#xff0c;调查显示&#xff1a;金属制品行业企业信息化总体应用水平低&#xff0c;信息系统建设水平尚处于一般事务处理和简单信息管理阶段&#xff0c;“信息孤岛”问题严重。在信息化…

最新发布!MySQL 9.0 的向量 (VECTOR) 类型文档更新

7月1日&#xff0c;MySQL 9.0.0 创新版本, 8.4.1 LTS, 8.0.38 三版齐发。 发版当天安装包已经可以下载&#xff0c;我也在第一时间做了分享&#xff1a; MySQL 9.0.0 新鲜出炉&#xff01;支持向量类型 当时参考手册还未上线&#xff0c;这两天文档虽已上线&#xff0c;但似乎仍…

RPM包管理-rpm命令管理

1.RPM包命令原则 所有的rpm包都在光盘中 例&#xff1a;httpd-2.2.15-15.e16.centos.1.i686.rpm httpd 软件包名 2.2.15 软件版本 15 软件发布的次数 e16.centos 适合的Linux平台 i686 适合的硬件平台…

springboot酒店管理系统-计算机毕业设计源码93190

目 录 摘 要 1 绪论 1.1 选题背景与意义 1.2开发现状 1.3论文结构与章节安排 2 酒店管理系统系统分析 2.1 可行性分析 2.1.1 技术可行性分析 2.1.2 经济可行性分析 2.1.3 法律可行性分析 2.2 系统功能分析 2.2.1 功能性分析 2.2.2 非功能性分析 2.3 系统用例分析…

【计算机视觉】基于OpenCV的直线检测

直线检测原理 霍夫变换是图像处理必然接触到的一个算法&#xff0c;它通过一种投票算法检测具有特定形状的物体,该过程在一个参数空间中通过计算累计结果的局部最大值得到一个符合该特定形状的集合作为霍夫变换结果&#xff0c;该方法可以进行圆&#xff0c;直线&#xff0c;椭…

docker安装ElasticSearchKibana

本文参考以下两篇文章 ✅ElasticSearch&Kibana 部署 云效 Thoughts 企业级知识库 (aliyun.com) docker安装ElasticSearch&Kibana - 飞书 安装elasticsearch 使用docker下载es&#xff1a; docker pull elasticsearch:8.13.0 挂载配置 创建挂在文件目录 mkdir…

Hadoop3:集群压测-读写性能压测

一、准备工作 首先&#xff0c;我们要知道&#xff0c;平常所说的网速和文件大小的MB是什么关系。 100Mbps单位是bit&#xff1b;10M/s单位是byte ; 1byte8bit&#xff0c;100Mbps/812.5M/s。 测试 配置102、103、104虚拟机网速 102上用Python开启一个文件下载服务&#x…

职升网:注会考试科目搭配策略建议!

一、CPA考试特点概述 CPA&#xff08;注册会计师&#xff09;考试是一个综合性极强的考试&#xff0c;分为专业阶段和综合阶段。专业阶段涵盖了《会计》、《审计》、《财务成本管理》、《税法》、《经济法》和《公司战略与风险管理》六大科目。这些科目不仅知识点繁多&#xf…

轻松搞定Docker!教你一键删除所有镜像!

大家好,我是CodeQi! 一位热衷于技术分享的码仔。 Docker 是一种流行的容器化平台,它提供了一种轻量级且可移植的方式来打包、分发和运行应用程序。 在使用 Docker 进行应用程序开发和部署时,我们通常会创建和使用各种镜像。然而,随着时间的推移,我们可能会积累大量的镜…

Ubuntu TensorRT安装

什么是TensorRT 一般的深度学习项目&#xff0c;训练时为了加快速度&#xff0c;会使用多 GPU 分布式训练。但在部署推理时&#xff0c;为了降低成本&#xff0c;往往使用单个 GPU 机器甚至嵌入式平台&#xff08;比如 NVIDIA Jetson&#xff09;进行部署&#xff0c;部署端也…

Keil用ST-LINK下载STM32程序后不自动运行

之后程序可以运行了&#xff0c;但是串口还没有输出&#xff0c;在debug模式下都是ok的。