ClickHouse10-ClickHouse中Kafka表引擎

Kafka表引擎也是一种常见的表引擎,在很多大数据量的场景下,会从源通过Kafka将数据输送到ClickHouse,Kafka作为输送的方式,ClickHouse作为存储引擎与查询引擎,大数据量的数据可以得到快速的、高压缩的存储。
在这里插入图片描述

Kafka大家肯定不陌生:

  • 它可以用于发布和订阅数据流,是常见的队列使用方式
  • 它可以组织容错存储,是常见的容错存储的使用方式
  • 它可以在流可用时对其进行处理,是常见的大数据处理的使用方式

全文概览:

  • 基本语法
  • 从 Kafka 写入到 ClickHouse
  • 从 ClickHouse 写入到 Kafka
    • 测试1:queue->ck->queue
    • 测试2:ck->queue

基本语法

分为定义表结构和定义Kafka的接入参数,Kafka的接入参数都是常见的字段

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [ALIAS expr1],
    name2 [type2] [ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port',
    kafka_topic_list = 'topic1,topic2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_schema = '',]
    [kafka_num_consumers = N,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0,]
    [kafka_client_id = '',]
    [kafka_poll_timeout_ms = 0,]
    [kafka_poll_max_batch_size = 0,]
    [kafka_flush_interval_ms = 0,]
    [kafka_thread_per_consumer = 0,]
    [kafka_handle_error_mode = 'default',]
    [kafka_commit_on_select = false,]
    [kafka_max_rows_per_message = 1];

示例:

CREATE TABLE IF NOT EXISTS test_ck_sync1
(
    `sys_time` Datetime COMMENT '',
    `num` UInt32 COMMENT ''
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'test_ck_sync1', kafka_group_name = 'ck_test_ck_sync1', kafka_format = 'CSV', kafka_max_block_size = 200000, kafka_skip_broken_messages = 1000, kafka_row_delimiter = '\n', format_csv_delimiter = '|'

从 Kafka 写入到 ClickHouse

创建topic:

bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic test_ck_sync1

创建同步表:

CREATE TABLE IF NOT EXISTS test_ck_sync1
(
    `sys_time` Datetime COMMENT '',
    `num` UInt32 COMMENT ''
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'test_ck_sync1', kafka_group_name = 'ck_test_ck_sync1', kafka_format = 'CSV', kafka_max_block_size = 200000, kafka_skip_broken_messages = 1000, kafka_row_delimiter = '\n', format_csv_delimiter = '|'

CREATE TABLE IF NOT EXISTS test_ck_sync1_res
(
    `sys_time` Datetime COMMENT '',
    `num` UInt32 COMMENT ''
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(sys_time)
ORDER BY tuple()

创建物化视图,进行数据样式的转换:

CREATE MATERIALIZED VIEW test_ck_sync1_mv TO test_ck_sync1_res AS
SELECT
    sys_time,
    num
FROM test_ck_sync1

通过console写入数据:

[$ kafka_2.13-3.6.1]# bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test_ck_sync1
>2024-01-01 00:00:01|89  

验证数据:

$ :) select * from test_ck_sync1_res;

SELECT *
FROM test_ck_sync1_res

Query id: a666f893-5be9-4022-9327-3a1507aa5485

┌────────────sys_time─┬─num─┐
│ 2024-01-01 00:00:01 │  89 │
└─────────────────────┴─────┘
┌────────────sys_time─┬─num─┐
│ 2024-01-01 00:00:00 │  88 │
└─────────────────────┴─────┘

2 rows in set. Elapsed: 0.049 sec.

从 ClickHouse 写入到 Kafka

kafka_writers_reader --(view)--> kafka_writers_queue ---> 

创建一个队列:

bin/kafka-topics.sh --topic kafka_writers --create -bootstrap-server 127.0.0.1:9092 --partitions 1 --replication-factor 1

创建同步表:

CREATE TABLE kafka_writers_reader (     `id` Int,     `platForm` String,     `appname` String,     `time` DateTime ) 
ENGINE = Kafka SETTINGS kafka_broker_list = '127.0.0.1:9092', kafka_topic_list = 'kafka_writers_reader', kafka_group_name = 'kafka_writers_reader_group', kafka_format = 'CSV';

CREATE TABLE kafka_writers_queue (     id Int,     platForm String,     appname String,     time DateTime ) 
ENGINE = Kafka SETTINGS kafka_broker_list = '127.0.0.1:9092',        kafka_topic_list = 'kafka_writers',        kafka_group_name = 'kafka_writers_group',        kafka_format = 'CSV',       kafka_max_block_size = 1048576;

测试1:queue->ck->queue

通过写入队列kafka_writers_reader,借助ClickHouse写入队列kafka_writers

bin/kafka-topics.sh --topic kafka_writers_reader --create -bootstrap-server 127.0.0.1:9092 --partitions 1 --replication-factor 1

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic kafka_writers_reader

bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic kafka_writers

测试2:ck->queue

通过写入表kafka_writers_reader,写入队列kafka_writers

$ :) INSERT INTO kafka_writers_reader (id, platForm, appname, time) 
VALUES (8,'Data','Test','2020-12-23 14:45:31'), 
(9,'Plan','Test1','2020-12-23 14:47:32'), 
(10,'Plan','Test2','2020-12-23 14:52:15'), 
(11,'Data','Test3','2020-12-23 14:54:39');

INSERT INTO kafka_writers_reader (id, platForm, appname, time) FORMAT Values

Query id: 223a63ab-97fa-488d-8ea7-c2e194155d26

Ok.

4 rows in set. Elapsed: 1.054 sec. 

[$ kafka_2.13-3.6.1]# bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic kafka_writers
8,"Data","Test","1970-01-01 08:00:00"

9,"Plan","Test1","1970-01-01 08:00:00"

10,"Plan","Test2","1970-01-01 08:00:00"

11,"Data","Test3","1970-01-01 08:00:00"

如果喜欢我的文章的话,可以去GitHub上给一个免费的关注吗?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/494063.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

免费SSL证书和付费SSL证书的区别点

背景: 在了解免费SSL证书和付费SSL证书的区别之前,先带大家了解一下SSL证书的概念和作用。 SSL证书的概念: SSL证书就是基于http超文本传输协议的延伸,在http访问的基础上增加了一个文本传输加密的协议,由于http是明…

【SQL】1633. 各赛事的用户注册率(COUNT函数 表达式用法)

题目描述 leetcode题目:1633. 各赛事的用户注册率 Code select contest_id, round(count(*)/(select count(*) from Users)*100, 2) as percentage from Register group by contest_id order by percentage desc, contest_id ascCOUNT()函数 COUNT函数用法&#…

每日一题 --- 链表相交[力扣][Go]

链表相交 题目:面试题 02.07. 链表相交 给你两个单链表的头节点 headA 和 headB ,请你找出并返回两个单链表相交的起始节点。如果两个链表没有交点,返回 null 。 图示两个链表在节点 c1 开始相交**:** 题目数据 保证 整个链式结…

【每日力扣】452. 用最少数量的箭引爆气球与763. 划分字母区间

🔥 个人主页: 黑洞晓威 😀你不必等到非常厉害,才敢开始,你需要开始,才会变的非常厉害。 452. 用最少数量的箭引爆气球 有一些球形气球贴在一堵用 XY 平面表示的墙面上。墙面上的气球记录在整数数组 points &#xff0…

Chronos: 将时间序列作为一种语言进行学习

这是一篇非常有意思的论文,它将时间序列分块并作为语言模型中的一个token来进行学习,并且得到了很好的效果。 Chronos是一个对时间序列数据的概率模型进行预训练的框架,它将这些值标记为与基于transformer的模型(如T5)一起使用。模型将序列的…

阿里云ubuntu服务器搭建可视化界面

连接终端 最好初始化服务器的时候 不要以root权限创建 否则会出错 1更新软件: sudo apt-get update2安装ubuntu desktop : sudo apt-get install ubuntu-desktop3 配置ubuntu desktop并重启: sudo apt-get -f install sudo dpkg-reconfigure ubuntu-desktop sudo reboot4 su…

【MySQL】13. 索引(重点)

1. 没有索引,可能会有什么问题 索引:提高数据库的性能,索引是物美价廉的东西了。 不用加内存,不用改程序,不用调sql,只要执行正确的 create index ,查询速度就可能提高成百上千倍。 但是天下没…

VMware ESXi部署macOS Monterey

正文共:1024 字 30 图,预估阅读时间:2 分钟 最早使用黑苹果是在2015年,装在了古老的Acer商务本上(老樹發新芽,acer tm 4750g裝黑蘋果);上次安装黑苹果是在两年前(VMware…

深入理解element-plus table二次封装:从理论到实践的全面指南

前言 在许多中后台管理系统中,表格占据着半壁江山,如果使用element plus组件库,那么少不了要用到table组件,可是table组件的功能过于基础,因此,我在table组件的实现基础之上进一步封装,从而实现…

Windows直接运行python程序

Windows直接运行python程序 一、新建bat脚本二、新建vbs脚本 一、新建bat脚本 新建bat批处理脚本,写入以下内容 echo off call conda activate pytorch python app.pyecho off:在此语句后所有运行的命令都不显示命令行本身,但是本身的指令是…

微信小程序的页面交互练习——实现比较两数大小功能

前提&#xff1a;配置好页面后 一、在wxml里面搭建好框架&#xff1a; <navigation-bar title"Weixin" back"{{false}}" color"black" background"#FFF"></navigation-bar> <scroll-view class"scrollarea"…

简单了解原型模式

什么是原型模式 区别于单例模式&#xff0c;原型模式的一个类可以有多个实例化的对象。 原型模式通过拷贝来产生新的对象&#xff0c;而不是new&#xff0c;并且可以根据自己的需求修改对象的属性。 实现Cloneable接口实现拷贝 而拷贝又分为浅拷贝和深拷贝&#xff0c;两者在…

大模型论文阅读:ADAPTIVE BUDGET ALLOCATION FOR PARAMETEREFFICIENT FINE-TUNING

大模型论文阅读:ADAPTIVE BUDGET ALLOCATION FOR PARAMETEREFFICIENT FINE-TUNING 论文链接:https://arxiv.org/pdf/2303.10512v1.pdf 当存在大量下游任务时,微调所有预训练模型的参数变得不可行。因此,为了以参数高效的方式学习预训练权重的增量更新,提出了许多微调方法,…

node.js项目初始化操作

项目环境Vscode 1.新建一个文件夹node.js(xx.js) 2.右键点击node.js&#xff0c;点击打开终端 我在VScode打开终端 输入npm init初始化项目没反应。 解决方法&#xff1a;进入文件夹node.js&#xff0c;出入cmd跳转到终端 重新输入npm init命令 正确结果如下图 后续命令按下…

酷开科技依托酷开系统用“平台+产品+场景”塑造全屋智能生活!

杰弗里摩尔的“鸿沟理论”中写道&#xff1a;高科技企业推进产品的早期市场和产品被广泛接受的主流市场之间&#xff0c;存在着一条巨大的“鸿沟”。“鸿沟”&#xff0c;指产品吸引早期接纳者后、赢得更多客户前的那段间歇&#xff0c;以及其中可预知和不可预知的阻碍。多数产…

SPRING-BOOT实现HTTP大文件断点续传分片下载

版本&#xff1a;6.5.40 代码&#xff1a;up6-jsp-springboot: Web大文件上传-jsp-springboot示例 - Gitee.com nosql示例 nosql示例不需要进行任何配置&#xff0c;可以直接访问测试。 SQL示例 1.创建数据库 2.配置数据库连接 3.自动下载maven依赖 4.启动项目 启动成功 6.访…

Oracle VM(虚拟机)性能监控工具

Oracle VM是一个独立的虚拟化环境&#xff0c;由 Oracle 提供支持和设计&#xff0c;旨在为运行虚拟机提供轻量级、安全的基于服务器的平台。Oracle VM 能够在受支持的虚拟化环境中部署操作系统和应用软件&#xff0c;Oracle VM 将用户和管理员与底层虚拟化技术隔离开来&#x…

队列+宽搜例题讲解!

429. N 叉树的层序遍历 题目解析&#xff1a; 根据题目分析&#xff0c;可以看出题目要我们求的是N叉数的层序遍历&#xff0c;就是把每层的放在一块&#xff0c;最后把每层都输出出来即可&#xff01; 算法分析&#xff1a; 我们可以利用队列先进先出的特性进行求解&#x…

56. 合并区间(力扣LeetCode)

文章目录 56. 合并区间题目描述思路贪心算法方法一&#xff1a;直接在res中修改代码逻辑梳理&#xff1a; 方法二&#xff1a;在原数组中插入一个超出题目范围的数组代码逻辑梳理&#xff1a; 56. 合并区间 题目描述 以数组 intervals 表示若干个区间的集合&#xff0c;其中单…

MyBatis-Plus分页接口实现教程:Spring Boot中如何编写分页查询

&#x1f31f; 前言 欢迎来到我的技术小宇宙&#xff01;&#x1f30c; 这里不仅是我记录技术点滴的后花园&#xff0c;也是我分享学习心得和项目经验的乐园。&#x1f4da; 无论你是技术小白还是资深大牛&#xff0c;这里总有一些内容能触动你的好奇心。&#x1f50d; &#x…