PiflowX组件-WriteToUpsertKafka

WriteToUpsertKafka组件

组件说明

以upsert方式往Kafka topic中写数据。

计算引擎

flink

有界性

Streaming Upsert Mode

组件分组

kafka

端口

Inport:默认端口

outport:默认端口

组件属性

名称展示名称默认值允许值是否必填描述例子
kafka_hostKAFKA_HOST“”逗号分隔的Kafka broker列表。127.0.0.1:9092
topicTOPIC“”用于写入Kafka topic名称。topic-1
tableDefinitionTableDefinition“”Flink table定义。
key_formatkeyFormat“”Set(“json”, “csv”, “avro”)用于对Kafka消息中key部分序列化的格式。key字段由PRIMARY KEY语法指定。json
value_formatValueFormat“”Set(“json”, “csv”, “avro”)用于对Kafka消息中value部分序列化的格式json
value_fields_includeValueFieldsIncludeALLSet(“ALL”, “EXCEPT_KEY”)控制哪些字段应该出现在 value 中。可取值:
"ALL:消息的 value 部分将包含 schema 中所有的字段包括定义为主键的字段。
"EXCEPT_KEY:记录的 value 部分包含 schema 的所有字段,定义为主键的字段除外。
ALL
key_fields_prefixKeyFieldsPrefix“”为所有消息键(Key)格式字段指定自定义前缀,以避免与消息体(Value)格式字段重名。默认情况下前缀为空。 如果定义了前缀,表结构和配置项 ‘key.fields’ 都需要使用带前缀的名称。当构建消息键格式字段时,前缀会被移除, 消息键格式将会使用无前缀的名称。请注意该配置项要求必须将 ‘value.fields-include’ 配置为 ‘EXCEPT_KEY’。
sink_parallelismSinkParallelism“”定义upsert-kafka sink算子的并行度。默认情况下,由框架确定并行度,与上游链接算子的并行度保持一致。
sink_buffer_flush_max_rowsSinkBufferFlushMaxRows“”缓存刷新前,最多能缓存多少条记录。当sink收到很多同key上的更新时,缓存将保留同key的最后一条记录,因此sink缓存能帮助减少发往Kafka topic的数据量,以及避免发送潜在的tombstone消息。 可以通过设置为 ‘0’ 来禁用它默认,该选项是未开启的。注意,如果要开启sink缓存,需要同时设置 ‘sink.buffer-flush.max-rows’ 和 'sink.buffer-flush.interval两个选项为大于零的值。
sink_buffer_flush_intervalSinkBufferFlushInterval“”该选项可以传递任意的 Kafka 参数。选项的后缀名必须匹配定义在 Kafka 参数文档中的参数名。 Flink 会自动移除 选项名中的 “properties.” 前缀,并将转换后的键名以及值传入 KafkaClient。 例如,你可以通过 ‘properties.allow.auto.create.topics’ = ‘false’ 来禁止自动创建 topic。 但是,某些选项,例如’key.deserializer’ 和 ‘value.deserializer’ 是不允许通过该方式传递参数,因为 Flink 会重写这些参数的值。
propertiesPROPERTIES“”Kafka source连接器其他配置

WriteToUpsertKafka示例配置

演示实时统计网页pv和uv的总量。

{
  "flow": {
    "name": "UpsertKafkaTest",
    "uuid": "1234",
    "stops": [
      {
        "uuid": "0000",
        "name": "JsonStringParser1",
        "bundle": "cn.piflow.bundle.flink.json.JsonStringParser",
        "properties": {
          "content": "[{\"user_id\":\"1\",\"client_ip\":\"192.168.12.1\",\"client_info\":\"phone\",\"page_code\":\"1001\",\"access_time\":\"2021-01-08 11:32:24\",\"dt\":\"2021-01-08\"},{\"user_id\":\"1\",\"client_ip\":\"192.168.12.1\",\"client_info\":\"phone\",\"page_code\":\"1201\",\"access_time\":\"2021-01-08 11:32:55\",\"dt\":\"2021-01-08\"},{\"user_id\":\"2\",\"client_ip\":\"192.165.12.1\",\"client_info\":\"pc\",\"page_code\":\"1031\",\"access_time\":\"2021-01-08 11:32:59\",\"dt\":\"2021-01-08\"},{\"user_id\":\"1\",\"client_ip\":\"192.168.12.1\",\"client_info\":\"phone\",\"page_code\":\"1101\",\"access_time\":\"2021-01-08 11:33:24\",\"dt\":\"2021-01-08\"},{\"user_id\":\"3\",\"client_ip\":\"192.168.10.3\",\"client_info\":\"pc\",\"page_code\":\"1001\",\"access_time\":\"2021-01-08 11:33:30\",\"dt\":\"2021-01-08\"},{\"user_id\":\"1\",\"client_ip\":\"192.168.12.1\",\"client_info\":\"phone\",\"page_code\":\"1001\",\"access_time\":\"2021-01-08 11:34:24\",\"dt\":\"2021-01-08\"}]",
          "schema": "user_id:STRING,client_ip:STRING,client_info:STRING,page_code:STRING,access_time:TIMESTAMP,dt:STRING"
        }
      },
      {
        "uuid": "1111",
        "name": "WriteToKafka1",
        "bundle": "cn.piflow.bundle.flink.kafka.WriteToKafka",
        "properties": {
          "kafka_host": "hadoop01:9092",
          "topic": "user_ip_pv",
          "tableDefinition": "{\"catalogName\":null,\"dbname\":null,\"tableName\":null,\"ifNotExists\":true,\"physicalColumnDefinition\":[{\"columnName\":\"user_id\",\"columnType\":\"STRING\",\"comment\":\"用户ID\"},{\"columnName\":\"client_ip\",\"columnType\":\"STRING\",\"comment\":\"客户端IP\"},{\"columnName\":\"client_info\",\"columnType\":\"STRING\",\"comment\":\"设备机型信息\"},{\"columnName\":\"page_code\",\"columnType\":\"STRING\",\"comment\":\"页面代码\"},{\"columnName\":\"access_time\",\"columnType\":\"TIMESTAMP\",\"comment\":\"请求时间\"},{\"columnName\":\"dt\",\"columnType\":\"STRING\",\"comment\":\"时间分区天\"}],\"metadataColumnDefinition\":null,\"computedColumnDefinition\":null,\"watermarkDefinition\":null}",
          "format": "json",
          "properties": "{\"json.ignore-parse-errors\":\"true\"}"
        }
      },
      {
        "uuid": "2222",
        "name": "ReadFromKafka1",
        "bundle": "cn.piflow.bundle.flink.kafka.ReadFromKafka",
        "properties": {
          "kafka_host": "hadoop01:9092",
          "topic": "user_ip_pv",
          "group": "test",
          "startup_mode": "earliest-offset",
          "tableDefinition": "{\"catalogName\":null,\"dbname\":null,\"tableName\":\"source_ods_fact_user_ip_pv\",\"ifNotExists\":true,\"physicalColumnDefinition\":[{\"columnName\":\"user_id\",\"columnType\":\"STRING\",\"comment\":\"用户ID\"},{\"columnName\":\"client_ip\",\"columnType\":\"STRING\",\"comment\":\"客户端IP\"},{\"columnName\":\"client_info\",\"columnType\":\"STRING\",\"comment\":\"设备机型信息\"},{\"columnName\":\"page_code\",\"columnType\":\"STRING\",\"comment\":\"页面代码\"},{\"columnName\":\"access_time\",\"columnType\":\"TIMESTAMP\",\"comment\":\"请求时间\"},{\"columnName\":\"dt\",\"columnType\":\"STRING\",\"comment\":\"时间分区天\"}],\"metadataColumnDefinition\":null,\"computedColumnDefinition\":null,\"watermarkDefinition\":null}",
          "format": "json",
          "properties": "{}"
        }
      },
      {
        "uuid": "3333",
        "name": "SQLExecute1",
        "bundle": "cn.piflow.bundle.flink.common.SQLExecute",
        "properties": {
          "sql": "CREATE VIEW view_total_pv_uv_min AS SELECT dt AS do_date, count(client_ip) AS pv, count(DISTINCT client_ip) AS uv,max(access_time) AS access_time FROM source_ods_fact_user_ip_pv GROUP BY dt;"
        }
      },
      {
        "uuid": "4444",
        "name": "WriteToUpsertKafka1",
        "bundle": "cn.piflow.bundle.flink.kafka.WriteToUpsertKafka",
        "properties": {
          "kafka_host": "hadoop01:9092",
          "topic": "result_total_pv_uv_min",
          "key_format": "json",
          "value_format": "json",
          "value_fields_include": "ALL",
          "tableDefinition": "{\"catalogName\":null,\"dbname\":null,\"tableName\":\"result_total_pv_uv_min\",\"ifNotExists\":true,\"physicalColumnDefinition\":[{\"columnName\":\"do_date\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计日期\"},{\"columnName\":\"do_min\",\"columnType\":\"STRING\",\"nullable\":false,\"primaryKey\":true,\"partitionKey\":false,\"comment\":\"统计分钟\"},{\"columnName\":\"pv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"点击量\"},{\"columnName\":\"uv\",\"columnType\":\"BIGINT\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"一天内同个访客多次访问仅计算一个UV\"},{\"columnName\":\"currenttime\",\"columnType\":\"TIMESTAMP\",\"nullable\":false,\"primaryKey\":false,\"partitionKey\":false,\"comment\":\"当前时间\"}],\"metadataColumnDefinition\":null,\"computedColumnDefinition\":null,\"watermarkDefinition\":null,\"asSelectStatement\":\"SELECT  do_date,cast(DATE_FORMAT(access_time,'HH:mm') AS STRING) AS do_min,pv,uv,NOW() AS currenttime from view_total_pv_uv_min\"}",
          "properties": "{\"value.json.fail-on-missing-field\": false}"
        }
      }
    ],
    "paths": [
      {
        "from": "JsonStringParser1",
        "outport": "",
        "inport": "",
        "to": "WriteToKafka1"
      },
      {
        "from": "WriteToKafka1",
        "outport": "",
        "inport": "",
        "to": "ReadFromKafka1"
      },
      {
        "from": "ReadFromKafka1",
        "outport": "",
        "inport": "",
        "to": "SQLExecute1"
      },
      {
        "from": "SQLExecute1",
        "outport": "",
        "inport": "",
        "to": "WriteToUpsertKafka1"
      }
    ]
  }
}
示例说明
  1. 通过JsonStringParser将给定的json字符串解析,并输出到下游,通过WriteToKafka组件将数据写入到kafka的user_ip_pv topic中;

  2. 通过ReadFromKafka组件从user_ip_pv topic中读取数据;

  3. 使用SQLExecute组件执行创建视图view_total_pv_uv_min的语句;

  4. 使用WriteToUpsertKafka定义upsert kafka table,并使用tableDefinition属性中定义的asSelectStatement执行语句,将结果写入kafka。

tableDefinition属性结构
{
  "catalogName": null,
  "dbname": null,
  "tableName": "result_total_pv_uv_min",
  "ifNotExists": true,
  "physicalColumnDefinition": [
    {
      "columnName": "do_date",
      "columnType": "STRING",
      "nullable": false,
      "primaryKey": true,
      "partitionKey": false,
      "comment": "统计日期"
    },
    {
      "columnName": "do_min",
      "columnType": "STRING",
      "nullable": false,
      "primaryKey": true,
      "partitionKey": false,
      "comment": "统计分钟"
    },
    {
      "columnName": "pv",
      "columnType": "BIGINT",
      "nullable": false,
      "primaryKey": false,
      "partitionKey": false,
      "comment": "点击量"
    },
    {
      "columnName": "uv",
      "columnType": "BIGINT",
      "nullable": false,
      "primaryKey": false,
      "partitionKey": false,
      "comment": "一天内同个访客多次访问仅计算一个UV"
    },
    {
      "columnName": "currenttime",
      "columnType": "TIMESTAMP",
      "nullable": false,
      "primaryKey": false,
      "partitionKey": false,
      "comment": "当前时间"
    }
  ],
  "metadataColumnDefinition": null,
  "computedColumnDefinition": null,
  "watermarkDefinition": null,
  "asSelectStatement": "SELECT  do_date,cast(DATE_FORMAT(access_time,'HH:mm') AS STRING) AS do_min,pv,uv,NOW() AS currenttime from view_total_pv_uv_min"
}

演示DEMO
在这里插入图片描述

演示案例参考

实时数仓|以upsert的方式读写Kafka数据—Flink1.12为例_upsert-connect 时间周期-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/283334.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Unity坦克大战开发全流程——结束场景——通关界面

结束场景——通关界面 就照着这样来拼 写代码 hideme不要忘了 修改上一节课中的代码

旅游网站Xtrip 前端模板html推荐

一、需求分析 旅游网站的功能可以根据具体的业务需求和目标进行不同的设计和实现,但是以下是一些常见的旅游网站功能,供参考: 酒店预订功能:用户可以搜索并预订酒店,查看酒店的详细信息、价格、评价和照片&#xff0c…

MySQL 8.0 InnoDB Tablespaces之General Tablespaces(通用表空间/一般表空间)

文章目录 MySQL 8.0 InnoDB Tablespaces之General Tablespaces(通用表空间/一般表空间)General tablespaces(通用表空间/一般表空间)通用表空间的功能通用表空间的限制 创建通用表空间(一般表空间)创建语法…

Linux磁盘与文件管理

目录 一、磁盘介绍 1. 磁盘数据结构 2. 磁盘的接口类型 3. 磁盘在Linux上的表现形式 二、磁盘分区与MBR 1. 分区优缺点 2. 分区方式 3. MBR分区 4. GPT分区 三、文件系统 1. 文件系统的组成 2. 默认的文件系统 3. 文件系统的作用 4. 模拟破坏文件与修复文件 4…

项目总结报告

《项目总结报告》 1.项目概要(项目基本信息,项目期间,项目成果,项目开发工具环境) 2.项目工作分析(需求变更,计划与进度实施,投入情况,收益情况,质量情况&…

【Linux】Linux 下基本指令 -- 详解

无论是什么命令,用于什么用途,在 Linux 中,命令有其通用的格式: command [-options] [parameter] command:命令本身。-options:[可选,非必填]命令的一些选项,可以通过选项控制命令的…

navicat premium历史版本下载及更新navicat premium15 永久(使用)有效期

1、navicat premium介绍 Navicat Premium 是一套可创建多个连接的数据库开发工具,让你从单一应用程序中同时连接 MySQL、Redis、MariaDB、MongoDB、SQL Server、Oracle、PostgreSQL 和 SQLite 。它与 GaussDB 、OceanBase 数据库及 Amazon RDS、Amazon Aurora、Amaz…

windows和linux操作Git(序章2)

Git 分布式版本控制系统(序章1) ## Linux 下安装 Git🔺 ​ ## Git命令大全🔺 windows和linux通用 安装完 Git 后,需要进行配置,如姓名、Email 等 git config --global user.name "你的名字"git config --global us…

解算人生--写于2023跨年之夜

最近买了一本书,书名叫《计算》 读了部分内容,虽然理解上还需要再下下功夫,但是直观的感觉冲击还是挺大的,最明显的就是表面与本质的把握。大家可能都有这样一种感觉,初步涉足某一领域时,开始我们都会被大量…

vmware部署docker+springboot+MySQL(超详细)

一、前期准备 (一)安装jdk #docker search openjdk #docker pull openjdk:8 (二)确认网络 如果局域网其他终端(如手机访问),虚拟机网络连接需要选择《桥接》模式,而且,需要使用有线连接,不能使用Wi-Fi,切忌切忌! 并且要选择实际的那个有线连接。比如我这里是“R…

Leetcode 剑指 Offer II 059. 数据流中的第 K 大元素

题目难度: 简单 原题链接 今天继续更新 Leetcode 的剑指 Offer(专项突击版)系列, 大家在公众号 算法精选 里回复 剑指offer2 就能看到该系列当前连载的所有文章了, 记得关注哦~ 题目描述 设计一个找到数据流中第 k 大元素的类(class&#xf…

文心一言 VS 讯飞星火 VS chatgpt (169)-- 算法导论13.2 2题

二、用go语言,证明:在任何一棵有 n 个结点的二叉搜索树中,恰有 n-1种可能的旋转。 文心一言: 要证明在任何一棵有 n 个结点的二叉搜索树中,恰有 n-1 种可能的旋转,我们可以按照以下步骤逐步推导&#xff…

声明式导航传参详情

1 动态路由传参 路由规则path ->/article/:aid 导航链接 <router-link to"/article/1">查看第一篇文章</router-link> 组件获取参数: this.$route.params.aid 如果想要所有的值&#xff0c;就用this. $route. params 注意&#xff1a;这两个必须匹配…

【数据结构】图论与并查集

一、并查集 1.原理 简单的讲并查集&#xff0c;就是查询两个个元素&#xff0c;是否在一个集合当中&#xff0c;这里的集合用树的形式进行表示。并查集的本质就是森林, 即多棵树。 我们再来简单的举个例子: 假设此时的你是大一新生&#xff0c;刚进入大学&#xff0c;肯定是…

【C++对于C语言的扩充】C++与C语言的联系,命名空间、C++中的输入输出以及缺省参数

文章目录 &#x1f680;前言&#x1f680;C有何过C之处&#xff1f;&#x1f680;C中的关键字&#x1f680;命名空间✈️为什么要引入命名空间&#xff1f;✈️命名空间的定义✈️如何使用命名空间中的内容呢&#xff1f; &#x1f680;C中的输入和输出✈️C标准库的命名空间✈…

SpringBoot实用篇

SpringBoot实用篇 1、热部署 什么是热部署&#xff1f; 所谓热部署&#xff0c;就是在应用正在运行的时候升级软件&#xff0c;却不需要重新启动应用。对于Java应用程序来说&#xff0c;热部署就是在运行时更新Java类文件。 热部署有什么用&#xff1f; 节约时间&#xff0c;热…

Mybatis枚举类型处理和类型处理器

专栏精选 引入Mybatis Mybatis的快速入门 Mybatis的增删改查扩展功能说明 mapper映射的参数和结果 Mybatis复杂类型的结果映射 Mybatis基于注解的结果映射 Mybatis枚举类型处理和类型处理器 再谈动态SQL Mybatis配置入门 Mybatis行为配置之Ⅰ—缓存 Mybatis行为配置…

linux下docker搭建Prometheus +SNMP Exporter +Grafana进行核心路由器交换机监控

一、安装 Docker 和 Docker Compose https://docs.docker.com/get-docker/ # 安装 Docker sudo apt-get update sudo apt-get install -y docker.io# 安装 Docker Compose sudo apt-get install -y docker-compose二、创建配置文件及测试平台是否正常 1、选个文件夹作为自建…

医学图像分割中的频域多轴表示学习

摘要 https://arxiv.org/pdf/2312.17030v1.pdf 最近&#xff0c;视觉Transformer (ViT)在医学图像分割&#xff08;MIS&#xff09;中得到了广泛应用&#xff0c;这归功于其在空间域应用自注意力机制来建模全局知识。然而&#xff0c;许多研究都侧重于改进空间域模型&#xff…

LeetCode刷题--- 不同路径 III

个人主页&#xff1a;元清加油_【C】,【C语言】,【数据结构与算法】-CSDN博客 个人专栏 力扣递归算法题 http://t.csdnimg.cn/yUl2I 【C】 ​​​​​​http://t.csdnimg.cn/6AbpV 数据结构与算法 ​​​http://t.csdnimg.cn/hKh2l 前言&#xff1a;这个专栏主要讲述递…