PiflowX组件-ReadFromUpsertKafka

ReadFromUpsertKafka组件

组件说明

upsert方式从Kafka topic中读取数据。

计算引擎

flink

有界性

Unbounded

组件分组

kafka

端口

Inport:默认端口

outport:默认端口

组件属性

名称展示名称默认值允许值是否必填描述例子
kafka_hostKAFKA_HOST“”逗号分隔的Kafka broker列表。127.0.0.1:9092
topicTOPIC“”用于写入Kafka topic名称。topic-1
tableDefinitionTableDefinition“”Flink table定义。
key_formatkeyFormat“”Set(“json”, “csv”, “avro”)用于对Kafka消息中key部分反序列化的格式。key字段由PRIMARY KEY语法指定。json
value_formatValueFormat“”Set(“json”, “csv”, “avro”)用于对Kafka消息中value部分反序列化的格式json
value_fields_includeValueFieldsIncludeALLSet(“ALL”, “EXCEPT_KEY”)控制哪些字段应该出现在 value 中。可取值:"ALL:消息的 value 部分将包含 schema 中所有的字段包括定义为主键的字段。"EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。ALL
key_fields_prefixKeyFieldsPrefix“”为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 ‘key.fields’ 都需要使用带前缀的名称。当构建消息键格式字段时,前缀会被移除, 消息键格式将会使用无前缀的名称。请注意该配置项要求必须将 ‘value.fields-include’ 配置为 ‘EXCEPT_KEY’。
propertiesPROPERTIES“”该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。 Flink 会自动移除 选项名中的 “properties.” 前缀,并将转换后的键名以及值传入 KafkaClient。 例如,你可以通过 ‘properties.allow.auto.create.topics’ = ‘false’ 来禁止自动创建 topic。 但是,某些选项,例如’key.deserializer’ 和 ‘value.deserializer’ 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。

ReadFromUpsertKafka示例配置

演示实时统计网页pv和uv的总量。

{
  "flow": {
    "name": "ReadFromUpsertKafkaTest",
    "uuid": "1234",
    "stops": [
      {
        "uuid": "5555",
        "name": "ReadFromUpsertKafka1",
        "bundle": "cn.piflow.bundle.flink.kafka.ReadFromUpsertKafka",
        "properties": {
          "kafka_host": "hadoop01:9092",
          "topic": "result_total_pv_uv_min",
          "key_format": "json",
          "value_format": "json",
          "value_fields_include": "ALL",
          "tableDefinition": "{\"ifNotExists\":true,\"physicalColumnDefinition\":[{\"columnName\":\"do_date\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计日期\"},{\"columnName\":\"do_min\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计分钟\"},{\"columnName\":\"pv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"点击量\"},{\"columnName\":\"uv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"一天内同个访客多次访问仅计算一个UV\"},{\"columnName\":\"currenttime\",\"columnType\":\"TIMESTAMP\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"当前时间\"}],\"metadataColumnDefinition\":null,\"computedColumnDefinition\":null}",
          "properties": "{\"value.json.fail-on-missing-field\": false,\"properties.group.id\": \"test\"}"
        }
      },
      {
        "uuid": "6666",
        "name": "ShowChangeLogData1",
        "bundle": "cn.piflow.bundle.flink.common.ShowChangeLogData",
        "properties": {
          "showNumber": "5000"
        }
      }
    ],
    "paths": [
      {
        "from": "ReadFromUpsertKafka1",
        "outport": "",
        "inport": "",
        "to": "ShowChangeLogData1"
      }
    ]
  }
}
示例说明
  1. 通过k.kafka.ReadFromUps从kafka的result_total_pv_uv_min topic中读取数据(使用WriteToUpsertKafka组件写入到result_total_pv_uv_min中的数据);

  2. 通过ShowChangeLogData组件将数据输出到控制台。

tableDefinition属性结构
{
    "ifNotExists": true,
    "physicalColumnDefinition": [{
        "columnName": "do_date",
        "columnType": "STRING",
        "nullable": false,
        "primaryKey": true,
        "partitionKey": false,
        "comment": "统计日期"
    }, {
        "columnName": "do_min",
        "columnType": "STRING",
        "nullable": false,
        "primaryKey": true,
        "partitionKey": false,
        "comment": "统计分钟"
    }, {
        "columnName": "pv",
        "columnType": "BIGINT",
        "nullable": false,
        "primaryKey": false,
        "partitionKey": false,
        "comment": "点击量"
    }, {
        "columnName": "uv",
        "columnType": "BIGINT",
        "nullable": false,
        "primaryKey": false,
        "partitionKey": false,
        "comment": "一天内同个访客多次访问仅计算一个UV"
    }, {
        "columnName": "currenttime",
        "columnType": "TIMESTAMP",
        "nullable": false,
        "primaryKey": false,
        "partitionKey": false,
        "comment": "当前时间"
    }],
    "metadataColumnDefinition": null,
    "computedColumnDefinition": null
}

演示DEMO

在这里插入图片描述
欢迎关注PiflowX公众号,谢谢支持!!!

在这里插入图片描述

演示案例参考

实时数仓|以upsert的方式读写Kafka数据—Flink1.12为例_upsert-connect 时间周期-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/300432.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

OpenSource - 基于Netty的网络扩展库HServer

文章目录 概述官网Hserver的理念特点原理图代码案例HelloWorld 概述 HServer是一个基于Netty开发网络扩展库.使用插件方式来扩展我们的业务 HServer提供 web,gateway,rpc 等插件 同时用户也可以自定义插件,来完成各种各样的业务场景。 官网 https://gitee.com/HSe…

计算机毕业设计 | SpringBoot+vue移动端音乐网站 音乐播放器(附源码)

1,项目背景 随着计算机技术的发展,网络技术对我们生活和工作显得越来越重要,特别是现在信息高度发达的今天,人们对最新信息的需求和发布迫切的需要及时性。为了满足不同人们对网络需求,各种特色,各种主题的…

一文带你弄懂 V8 数组的快速 / 字典模式

V8 是如何处理数组的&#xff1f; 问题 先抛出一个问题&#xff0c;下面两端代码哪个的效率更高&#xff1f; const arr []; for(let i 0; i < 10000000; i) {arr[i] 1; }const arr []; arr[10000000 - 1] 1; for(let i 0; i < 10000000; i) {arr[i] 1; }答案是…

一文搞定通过UTM 在MAC M1上安装Win11

Why 临近过年&#xff0c;一年一度的抢票大战就要开始。抢票软件要求安装在windows&#xff0c;作为mac资深用户&#xff0c;必须安装个windows虚拟机。 How step by step: follow YouTube。具体step follow YouTube 视频。本文&#xff0c;只说一下&#xff0c;特别容易错…

Unity中Shader面片一直面向摄像机(个性化修改及适配BRP)

文章目录 前言一、个性化修改面向摄像机效果1、把上一篇文章中求的 Z轴基向量 投影到 XoZ平面上2、其余步骤和之前的一致3、在属性面板定义一个变量&#xff0c;控制面片面向摄像机的类型4、效果 二、适配BRP三、最终代码 前言 在上一篇文章中&#xff0c;我们用Shader实现了面…

1.7 day5 IO进程线程

互斥锁 #include <myhead.h> char buf[128];//创建临界资源 pthread_mutex_t mutex;//创建锁 void *task(void *arg)//分支线程 {while(1){pthread_mutex_lock(&mutex);//上锁printf("分支线程:buf%s\n",buf);strcpy(buf,"I Love China");pthre…

mysql基础-表数据操作之查

目录 1.别名 2. 单表查询 2.1 模糊查询 - like 2.2 limit 2.3 order by 2.4 group by 2.5 in 2.6 between and 2.6 is null 2.7 not 2.8 运算符 3. 联表查询 全连接 左连接 右连接 本次分享一下数据的DQL语言。 1.别名 首先分享一下别名的知识。我们在查询的时…

【创建VirtualBox虚拟机并安装openEuler20.03 TLS SP1操作系统】

创建VirtualBox虚拟机并安装openEuler20.03 TLS SP1操作系统 一、环境说明二、安装过程 一、环境说明 虚拟机软件&#xff1a;Virtualbox操作系统&#xff1a;openEuler 20.03 TLS SP1&#xff08;x86&#xff09; 二、安装过程 创新虚拟机 修改虚拟机配置&#xff1a; …

MySQL复习汇总(图书管理系统)

MySQL图书管理系统&#xff08;49-94&#xff09;源码_71.备份book数据库到e盘的mybook.sql文件(备份文件中要求包含建库命令)-CSDN博客 -- 1、 创建一个名称为book的数据库。 -- 2、 打开book数据库 -- 3、 创建数据表分别如下&#xff08;除外键之外&#xff09;…

IDEA 中搭建 Spring Boot Maven 多模块项目 (父SpringBoot+子Maven)

第1步&#xff1a;新建一个SpringBoot 项目 作为 父工程 [Ref] 新建一个SpringBoot项目 删除无用的 .mvn 目录、 src 目录、 mvnw 及 mvnw.cmd 文件&#xff0c;最终只留 .gitignore 和 pom.xml 第2步&#xff1a;创建 子maven模块 第3步&#xff1a;整理 父 pom 文件 ① …

支付宝扫码(Easy版)支付实现

文章目录 一 技术准备1.1 二维码技术&#xff08;java&#xff09;1.2 支付宝沙箱环境准备1.3 内网穿透 二 支付宝支付相关知识2.1 各种支付方式2.2 扫码付接入流程2.3 系统交互流程(时序图)2.4 加密逻辑 三 扫码支付实现3.1 添加maven依赖&#xff08;Easy版&#xff09;3.2 完…

【Python发送邮件】

Python发送邮件 使用python的email和smtplib发送邮件 使用python的email和smtplib发送邮件 需要先下载安装 email 和 smtplib 模块 // An highlighted block pip install email smtplib需要去你的邮件地址申请一下 API Key&#xff0c;这是专门用来开发时使用的密钥。 获取后…

看CHAT如何判断php Imagick writeImages写入gif已经完毕

CHAT回复&#xff1a;Imagick::writeImages() 是同步执行的&#xff0c;也就是说这个函数会阻塞直到 GIF 文件被完全写出。所以如果这个函数没有报错并成功返回&#xff0c;那么你可以认为 GIF 文件已经被完全写出了。 如果你想要在写出 GIF 文件后立即做一些操作&#xff08;例…

学习c语言,隐形类型转换,整形提升

把整形定义字符型的话&#xff0c;字符型指挥提取整形前8位&#xff0c;但是整形有32位&#xff0c;如果字符型最后一位为0全部补0&#xff0c;为1全部补1。

测试分类篇

小王学习录 测试分类按测试对象划分1. 界面测试2. 可靠性测试3. 容错性测试4. 文档测试5. 兼容性测试6. 易用性测试7. 安装卸载测试8. 安全测试9. 性能测试10. 内存泄露测试 按是否查看代码划分1. 黑盒测试2. 白盒测试3. 灰盒测试 按开发阶段分1. 单元测试2. 集成测试3. 系统测…

WebofScience快速检索文献的办法

Web of Science为什么老是搜不到文章&#xff0c;原来是要在这个地方全部勾选 如果是搜标题的话&#xff0c;选Title&#xff0c;输入你要搜的文章标题 另外&#xff0c;也可以在浏览器直接搜文章标题&#xff0c;得到文章的DOI&#xff0c;然后选DOI&#xff0c;直接搜DOI也行…

draw.io基础操作和代码高效画图进阶

文章目录 一、基础操作1、链接2、等比例变形3、复制4、插入表格 二、在线打开三、插入—功能聚集地1、插入图片2、插入画笔3、插入布局4、导出 四、图码转换——高效画图1、通用图码转换2、流程图生成&#xff1a;使用mermaid语言生成图&#xff1a; 五、图码转换高效画图的典型…

超维空间M1无人机使用说明书——52、ROS无人机二维码识别与降落

引言&#xff1a;使用二维码引导无人机实现精准降落&#xff0c;首先需要实现对二维码的识别和定位&#xff0c;可以参考博客的二维码识别和定位内容。本小节主要是通过获取拿到的二维码位置&#xff0c;控制无人机全向的移动和降落&#xff0c;分为两种&#xff0c;一种是无人…

问题:为什么IP和端口一样的两个应用服务可以正常启动呢?

mysql安装 将MongoDB安装好并设置bindIP为0.0.0.0&#xff0c;端口为3306 发现问题&#xff0c;MySQL和MongoDB端口和IP一致&#xff0c;两个服务都能同时启动&#xff0c;这是为什么呢&#xff1f; 麻烦知道的大佬帮忙解答一下~~~

win10下安装detectron2-0.5(0.6应该也可以)

最近从github上下载的实例分割代码是detectron2-0.5的记录下配置过程。 1、前面什么pytorch基本步骤网上教程很多, 对着项目的要求下就行 2、到这里一般都是让你去下一个detectron2&#xff0c;但是我们从github上白嫖的项目都是有detectron2的&#xff0c;所以只需要找到那个s…