【大数据】NiFi 中的处理器(二):PutDatabaseRecord

NiFi 中的处理器(二):PutDatabaseRecord

  • 1.基本介绍
  • 2.属性配置
  • 3.连接关系
  • 4.应用场景

1.基本介绍

PutDatabaseRecord 处理器使用指定的 RecordReader 从传入的流文件中读取(可能是多个,说数组也成)记录。这些记录将转换为 SQL 语句,并作为一个批次执行。如果发生任何错误,则将流文件路由到 failureretry,如果执行成功,则将传入的流文件路由到 success。处理器执行的 SQL 语句类型通过 Statement Type 属性指定,该属性接受一些硬编码的值,例如 INSERTUPDATEDELETE ,使用 "Use statement.type Attribute" 可以使处理器获取流文件属性中的语句类型。

说明:如果语句类型为 UPDATE,正常的不应该修改主键的值。如果记录中修改主键的值,那么有可能找不到数据进行修改或者修改破坏了一些数据(说白了,代码是按照根据主键值为条件进行 update 的)。

当然,隐藏的功能是 statement.type 的值时 'SQL' 的时候,可以从 record 中的某个字段读取值,此值应该是一个可以执行的 SQL 语句,该处理器就执行这个 SQL 就可以了。

2.属性配置

在下面的列表中,必需属性的名称以粗体显示。任何其他属性(不是粗体)都被认为是可选的,并且指出属性默认值(如果有默认值),以及属性是否支持表达式语言。

Name
Default Value
Allowable Values
Description
Record ReaderController Service API:
RecordReaderFactory
Implementations: JsonPathReader
XMLReader
ScriptedReader
CSVReader
Syslog5424Reader
GrokReader
AvroReader
JsonTreeReader
ParquetReader
SyslogReader
指定用于解析传入数据和确定数据模式的 Controller Service。
Database TypeGenericGeneric
Oracle
Oracle 12+
MS SQL 2012+
MS SQL 2008
MySQL
PostgreSQL
数据库的类型/风格,用于生成特定于数据库的代码。在许多情况下,通用类型就足够了,但是某些数据库(例如 Oracle)需要自定义 SQL 子句。
Statement TypeUPDATE
INSERT
UPSERT
DELETE
Use statement.type Attribute
指定要生成的 SQL 语句的类型。请参考数据库文档以获取每个操作行为的描述。请注意,某些数据库类型可能不支持某些语句类型。如果选择了 "Use statement.type Attribute",则该值取自 FlowFile 中的 statement.type 属性。 "Use statement.type Attribute" 选项是唯一允许使用 "SQL" 语句类型的选项。如果指定了 "SQL",则 "Field ContainingSQL" 属性指定的字段的值应为目标数据库上的有效 SQL 语句,并将按原样执行。
Database Connection Pooling ServiceController Service API:
DBCPService
Implementations:
DBCPConnectionPool
HiveConnectionPool
DBCPConnectionPoolLookup
Controller Service,用于获得与数据库的连接以发送记录。
Catalog Name语句应更新的目录的名称。这可能不适用于你要更新的数据库。在这种情况下,请将该字段留空。
Supports Expression Language: true (will be evaluated using flow file attributes and variable registry)
Schema Name表所属的schema的名称。这可能不适用于你要更新的数据库。在这种情况下,请将该字段留空。
Supports Expression Language: true (will be evaluated using flow file attributes and variable registry)
Table Name语句应影响的表的名称。
Supports Expression Language: true (will be evaluated using flow file attributes and variable registry)
Translate Field Namestruetrue
false
如果为 true,则处理器将尝试将字段名称转换为指定表的适当列名称。如果为 false,则字段名称必须与列名称完全匹配,否则该列将不会更新。
Unmatched Field BehaviorIgnore Unmatched FieldsIgnore Unmatched Fields
Fail on Unmatched Fields
如果输入的记录有一个字段没有映射到数据库表的任何列,该属性会指定如何处理这种情况。
Unmatched Column BehaviorFail on Unmatched ColumnsIgnore Unmatched Columns
Warn on Unmatched Columns
Fail on Unmatched Columns
如果输入的记录没有数据库表所有列的字段映射,该属性会指定如何处理这种情况。
Update Keys列名的逗号分隔列表,可唯一标识数据库中 UPDATE 语句的行。如果语句类型为 UPDATE 且未设置此属性,则使用表的主键。在这种情况下,如果不存在主键,并且如果 Unmatched Column Behaviour 设置为 FAIL,则到 SQL 的转换将失败。如果语句类型为 INSERT,则忽略此属性。
Supports Expression Language: true (will be evaluated using flow file attributes and variable registry)
Field Containing SQL如果语句类型为 "SQL"(在 statement.type 属性中设置),则此字段指示记录中的哪个字段包含要执行的 SQL 语句。该字段的值必须是单个 SQL 语句。如果语句类型不是 "SQL",则忽略此字段。
Supports Expression Language: true (will be evaluated using flow file attributes and variable registry)
Allow MultipleSQL Statementsfalsetrue
false
如果语句类型为 "SQL"(在 statement.type 属性中设置),则此字段指示是否用分号分隔字段值并分别执行每个语句。如果有任何语句导致错误,则将回滚整个语句集。如果语句类型不是 "SQL",则忽略此字段。
Quote Column Identifiersfalsetrue
false
启用此选项将导致所有列名都被引用,从而允许你将保留字用作表中的列名。
Quote Table Identifiersfalsetrue
false
启用该选项后,表名将加引号,以支持在表名中使用特殊字符。
Max Wait Time 0 0 0 seconds运行的 SQL 语句所允许的最长时间, 0 0 0 表示没有限制。少于 1 1 1 秒的最长时间将等于 0 0 0
Supports Expression Language: true (will be evaluated using variable registry only)
Rollback On Failurefalsetrue
false
指定如何处理错误。默认情况下(false),如果在处理 FlowFile 时发生错误,则 FlowFile 将根据错误类型路由到 "failure""retry" 关系,处理器可以继续使用下一个 FlowFile。相反,你可能想回滚当前已处理的 FlowFile,并立即停止进一步的处理。在这种情况下,你可以通过启用此 Rollback On Failure 属性来实现。如果启用,失败的 FlowFiles 将保留在输入关系中,而不会受到惩罚,并会反复处理,直到成功处理或通过其他方式将其删除。重要的是要设置足够的 "Yield Duration",以免重试太频繁。
Table Schema Cache Size 100 100 100指定应缓存多少个表模式
Maximum Batch Size 0 0 0指定 INSERTUPDATE 语句的最大批处理大小。该参数对 Statement Type 中指定的其他语句无效。 0 0 0 表示批量不受限制。
Supports Expression Language: true (will be evaluated using flow file attributes and variable registry)

在这里插入图片描述

3.连接关系

Name
Description
retry如果无法更新数据库,但再次尝试操作可能会成功,将 FlowFile 路由到此关系。
success从 SQL 查询结果集中成功创建了 FlowFile。
failure如果无法更新数据库,并且无法重试该操作(例如无效查询或违反完整性约束),也会将 FlowFile 路由到此关系。

4.应用场景

PutDatabaseRecord 之前,我们想要写入数据到数据库,往往需要使用 ConvertJsonToSql + PutSQL 组合,尤其是当数据格式不是 json 的时候还需要先将数据转换为 json,而使用 ConvertJsonToSql 属于一边连接了目标库,一边要在内存解析一次数据,转成了参数化的 SQL,并且参数也是放到 FlowFile 的属性中,平白无故的这个 FlowFile 也就更吃内存了。PutDatabaseRecord 的好处就是我们可以将任何 NIFI 支持的 Record 写入指定目的,在内存解析一次数据就可以了。当然了,前后两种方式写数据到数据库的基本原理都是一样的,只是 PutDatabaseRecord 的效率更好一些。

最早,PutDatabaseRecord 支持将特定的 Record 集合转成 InsertUpdateDelete 语句,我们只要选择 Statement Type 即可。然后为了更灵活,增加了 Use statement.type Attribute 选项,我们可以在上游的 FlowFile 中指定 statement.type 属性,这期间又暗地里加了 "statement.type=SQL" 的功能,当 Statement Type 的值为 "SQL" 的时候,我们要配合 Field Containing SQL 配置进行工作。Field Containing SQL 指的是上游来的 FlowFile 中的一个字段,这个字段值是一个可执行的 SQL。

可能让我们比较迷茫的是 Unmatched Field BehaviorUnmatched Column Behavior,我们如果纠结这两个配置的描述就会很难受,我们只关注两个单词 FieldColumn 就可以分清楚了。

Column 我们知道,(目标)表的列嘛,就是说如果你手里的数据中的列没有与我目标表的 Column 对应会怎么样。而 Field 针对的是 Record(博主注:可以理解为一行行数据),是具体的数据,就是说如果你目标表里的列没有与我 Record 中的 Field 相对应会怎么样。具体的关系我描述一下:首先 Record 中会携带 schema 元数据信息(或推断出 schema 信息),信息里会有若干个 Field。我们在生成 SQL 的时候,会从目标数据库查询指定表的元数据信息(放缓存里),而数据库里设置成非 null 的且非自增长的没有设置默认值的则认为是 required 字段。

  • 然后针对 insertdelete 大体有三个步骤:
    • 第一步是遍历 required 字段,看 Record 里是否都有这几个字段,如果没有就用到 Unmatched Column Behavior,如果我们配置了 ignore 了,就继续执行。
    • 第二步是对这几个 Field 的遍历,查询是否在指定表的元数据里有对应的列信息,当遇到没有的情况时,就是 Unmatched Field Behavior,如果我们配置了 ignore 了,就继续执行。如果存在,我们就放到一个集合 set 里存起来。
    • 第二步遍历结束后,第三步我们再判断这个集合 set 有没有值,如果是空的,就直接报 "None of the fields in the record map to the columns defined by the " + tableName + " table" 的 SQLDataException 异常了。
  • update 的话稍微有些不一样,第一步就检测 Update Keys,如果没有对应值就默认使用目标表的主键,如果都没有值就报 "Table '" + tableName + "' does not have a Primary Key and no Update Keys were specified" 异常了,然后紧接着检测 Record 里是否有这些字段,没有就要 Unmatched Column Behavior。第二步跟上面一样,就是对这几个 Field 的遍历,查询是否在指定表的元数据里有对应的列信息,当遇到没有的情况时,就是 Unmatched Field Behavior,如果我们配置了 ignore 了,就继续执行。
  • 最后 upset 的检查就是融合了 insertupdate

在这里插入图片描述
然后得说一下这个 Translate Field Names,这个功能点其实非常好,其实就是将列名转大写并替换下划线(Record 中的列和指定表的列都做此转换,指定表的列信息会做成一个 Map 映射,转换的列名 : 列元数据信息)。

private static String normalizeColumnName(final String colName, final boolean translateColumnNames) {
        return colName == null ? null : (translateColumnNames ? colName.toUpperCase().replace("_", "") : colName);
    }

fieldName 转大写并替换下划线,然后跟指定表的同样转换过后的列元数据信息映射进行匹配,记录下 Field 的那个索引值,然后组 SQL 设置参数的时候根据索引值找到 Record 中对应的 value 就行了。这个功能其实就是帮助我们更好的对 Record 列和目标表列进行匹配。而 SQL 中的列名其实用的还是从指定表查询出来的列元数据信息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/310293.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

仿蓝奏云网盘 /file/list SQL注入漏洞复现

0x01 产品简介 仿蓝奏网盘是一种类似于百度网盘的文件存储和共享解决方案。它为用户提供了一个便捷的平台,可以上传、存储和分享各种类型的文件,方便用户在不同设备之间进行文件传输和访问。 0x02 漏洞概述 仿蓝奏云网盘 /file/list接口处存在SQL注入漏洞,登录后台的攻击…

启英泰伦离线自然说:让语音交互更“顺口”

你是不是也有这样的烦恼?每次用语音控制家里的智能设备,总是要说那几个固定的词,感觉有点别扭。比如,每次都要说“打开空调”,不能换个说法吗? 现在,有了启英泰伦的离线自然说技术,…

Kafka之集群搭建

1. 为什么要使用kafka集群 单机服务下,Kafka已经具备了非常高的性能。TPS能够达到百万级别。但是,在实际工作中使用时,单机搭建的Kafka会有很大的局限性。 ​ 消息太多,需要分开保存。Kafka是面向海量消息设计的,一个T…

QT第1天

题目&#xff1a;点击按钮改变文字 需要增加一个count属性&#xff0c;并且只需要定义槽&#xff0c;信号函数已经内置好了 //widget.h#ifndef WIDGET_H #define WIDGET_H#include <QWidget>QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_NAMESPACEclass Wi…

一文搞定SkyWalking 中Trace、Tracesegment 和 Span 的关系,非常重要!

基础概念 追踪&#xff08;Trace&#xff09; 是指一个请求或者一个操作从开始到结束的完整路径。它涵盖了分布式系统中所有相关组件的调用关系和性能信息。 跨度&#xff08;Span&#xff09; 是Trace的组成部分之一。Span代表一次调用或操作的单个组件&#xff0c;可以是…

centOS系统yum安装和卸载mongodb

0.1 什么是mongodb&#xff1f; 0.2 Mongodb是一个基于分布式文件存储的数据库。由C语言编写。旨在为WEB应用提供可扩展的高性能数据存储解决方案。 0.3 Mongodb是一个介于关系数据库和非关系数据库之间的产品&#xff0c;是非关系数据库当中功能最丰富&#xff0c;最像关系数据…

C# 关于多态性学习

前言 我相信大家都对面向对象的三个特征封装、继承、多态很熟悉&#xff0c;每个人都能说上一两句&#xff0c;但是大多数都仅仅是知道这些是什么&#xff0c;今天这篇文章是对C# 的多态性学习一下&#xff0c;巩固自己的基础&#xff0c;我们都知道同一操作作用于不同的对象&…

模板管理支持批量操作,DataEase开源数据可视化分析平台v2.2.0发布

2024年1月8日&#xff0c;DataEase开源数据可视化分析平台正式发布v2.2.0版本。 这一版本的功能升级包括&#xff1a;在“模板管理”页面中&#xff0c;用户可以通过模板管理的批量操作功能&#xff0c;对已有模板进行快速重新分类、删除等维护操作&#xff1b;数据大屏中&…

【UE Niagara学习笔记】06 - 制作火焰喷射过程中飞舞的火星

在上一篇博客&#xff08;【UE Niagara学习笔记】05 - 喷射火焰顶部的蓝色火焰&#xff09;的基础上继续实现喷射火焰的火星的效果。 目录 效果 步骤 一、创建材质实例 二、添加新的发射器 2.1 设置粒子材质 2.2 设置发射器持续生成粒子 2.3 设置粒子生成数量 2.4 设…

【麒麟V10系统x86环境--bash: ./install:/bin/bash:解释器错误: 权限不够】

不知道那位大拿分享的这个神操作、给力呀 标题-bash: ./install&#xff1a;/bin/bash&#xff1a;解释器错误: 权限不够 执行这个命令即可&#xff1b;sudo setstatus Softmode

【现代密码学】笔记3.4-3.7--构造安全加密方案、CPA安全、CCA安全 《introduction to modern cryphtography》

【现代密码学】笔记3.4-3.7--构造安全加密方案、CPA安全、CCA安全 《introduction to modern cryphtography》 写在最前面私钥加密与伪随机性 第二部分流加密与CPA多重加密 CPA安全加密方案CPA安全实验、预言机访问&#xff08;oracle access&#xff09; 操作模式伪随机函数PR…

自动化控制面板-1Panel

一、1Panel自动化控制面板 官网地址 1Panel 可以实现&#xff1a; 快速建站、高效管理、安全可靠、一键备份、应用商店 快速建站&#xff1a;深度集成 Wordpress 和 Halo&#xff0c;域名绑定、SSL 证书配置等一键搞定&#xff1b;高效管理&#xff1a;通过 Web 端轻松管理 …

构建数字化美食未来:深入了解连锁餐饮系统的技术实现

在当今数字化时代&#xff0c;连锁餐饮系统的设计与开发已成为餐饮业成功经营的重要一环。本文将深入研究连锁餐饮系统的技术实现&#xff0c;结合代码演示&#xff0c;为技术开发者和餐饮业者提供深刻的理解。 1. 技术选型与系统架构 在开始设计开发前&#xff0c;首先要考…

SD卡无法格式化怎么解决?

如何修复无法格式化的SD卡&#xff1f; 提供了4种SD卡无法格式化的解决方法&#xff0c;你可根据具体情况和需要选择合适的方法。 方法1. 更改驱动器号 有时&#xff0c;SD卡无法格式化是因为SD卡无法访问 。为了确保你的Windows操作系统能够识别并显示你的SD卡&#xff0c;…

《JVM由浅入深学习【七】 2024-01-11》JVM由简入深学习提升分享

亲爱的读者们&#xff0c;欢迎来到本篇博客&#xff0c;这是JVM第七次分享&#xff0c;下面是七个JVM常用常面的分享&#xff0c;请笑纳 目录 1. 几个与JVM 内存相关的核心参数2.如何计算一个对象的大小3.堆为什么要分为新生代和老年代4.JVM堆的年轻代为什么要有两个 Survivor…

大话 JavaScript(Speaking JavaScript):第二十一章到第二十五章

第二十一章&#xff1a;数学 原文&#xff1a;21. Math 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 Math对象用作多个数学函数的命名空间。本章提供了一个概述。 数学属性 Math的属性如下&#xff1a; Math.E 欧拉常数&#xff08;e&#xff09; Math.LN2 2 …

基于ssm校内二手商城交易系统+vue论文

摘 要 计算机网络发展到现在已经好几十年了&#xff0c;在理论上面已经有了很丰富的基础&#xff0c;并且在现实生活中也到处都在使用&#xff0c;可以说&#xff0c;经过几十年的发展&#xff0c;互联网技术已经把地域信息的隔阂给消除了&#xff0c;让整个世界都可以即时通话…

计算数学表达式的程序(Java课程设计)

1. 课设团队介绍 团队名称 团队成 员介绍 任务分配 团队成员博客 XQ Warriors 徐维辉 负责计算器数据的算法操作&#xff0c;如平方数、加减乘除&#xff0c;显示历史计算记录 无 邱良厦&#xff08;组长&#xff09; 负责计算器的图形设计&#xff0c;把输入和结果显…

LeetCode-搜索插入位置(35)

题目描述&#xff1a; 给定一个排序数组和一个目标值&#xff0c;在数组中找到目标值&#xff0c;并返回其索引。如果目标值不存在于数组中&#xff0c;返回它将会被按顺序插入的位置。 请必须使用时间复杂度为 O(log n) 的算法。 思路&#xff1a; 给定数组查找指定元素值的…

VUE element-ui实现表格动态展示、动态删减列、动态排序、动态搜索条件配置

1、实现效果 1.1、文件目录 1.2、说明 1、本组件支持列表的表头自定义配置&#xff0c;checkbox实现 2、本组件支持列表列排序&#xff0c;vuedraggable是拖拽插件&#xff0c;上图中字段管理里的拖拽效果 &#xff0c;需要的话请自行npm install 3、本组件支持查询条件动态…