【大数据】NiFi 中的 Controller Service

NiFi 中的 Controller Service

  • 1.Service 简介
    • 1.1 Controller Service 的配置
      • 1.1.1 SETTING 基础属性
      • 1.1.2 PROPERTIES 使用属性
      • 1.1.3 COMMENT 页签
    • 1.2 Service 的使用范围
  • 2.全局参数配置
  • 3.DBCPConnectionPool 的使用样例
  • 4.在 ExcuseGroovyScript 组件中使用 Service

1.Service 简介

首先 NiFi 中的 Controller Service 和我们 MVC 概念中的 Controller Service 不是一个概念,NiFi 中的 Controller Service 更像是和 Processor 同级的一个概念,它和 Processor 在我个人的使用经验来理解的话就是 它是预制好的各种服务,可以被 Processor 引用或者支撑 Processor,例如一个 SQL 读取的 Processor,它得需要 JDBC 的连接,才能访问数据库。这里 Controller Service 就可以是一个 JDBC 的连接池服务。

同理,Controller Service 也是支持扩展的,可以像自定义开发 Processor 一样,根据自己的业务需求,进行自定义的 Controller Service 开发。

当我们使用某些依赖 Service 的组件(Processor)时,在配置中会出现选择 Service 或者创建新的 Service 的情况,这里的 Service 即是 NiFi 的 Controller Service,一旦创建新的,则会生成一个以 Group 为范围的 “全局” Service 对象,这时,再有依赖同类型 Service 的 Processor 时,可以直接选中。

在这里插入图片描述
在这里插入图片描述

1.1 Controller Service 的配置

单独查看 Controller Service 可以从面板空白处,右键 Configure 来看,如下图:

在这里插入图片描述
这是一个 JDBC 的连接池 Service,它包含的属性有 名称类型简介启用状态操作;从操作中可以看到配置该 Service 需要填写基本的各类属性;其中,Service 是有启停状态的,如果想修改 Service 的属性内容,必须先保证该 Service 是停用状态,然后点击配置标识,则进入配置页面,它的配置和 Processor 的差不多,通过页签区别,共有三个页签:SETTING基础属性)、PROPERTIES使用属性)、COMMENT页签)。

1.1.1 SETTING 基础属性

基础属性,包含左侧的名称,名称可以进行更改,右侧包含引用此 Service 的 Processor 列表。

1.1.2 PROPERTIES 使用属性

在这里插入图片描述
核心的业务配置,此标签页的配置项根据不同的 Service,配置内容不一致,具体的配置项以及使用,可以参考官方的文档;这里的是 JDBC 的连接池,所以基本需要连接数据库所需的 URL、数据库的账号密码、数据库的驱动类名称、驱动类的依赖 jar 包路径 ,这里不少 Service 可能都需要第三方的 jar 包依赖才可以使用,长期使用或生产环境下,建议将所有 jar 资源集中放在统一路径下。

1.1.3 COMMENT 页签

在这里插入图片描述
一个提供 Service 使用说明的页签,可根据自己实际需求,补充使用 Service 的用法以及描述。

1.2 Service 的使用范围

在 NiFi 中,Group 同时也对 Service 起作用,如果我们在一个 NiFi 的最外层的平面上新增 Controller Service,那么这些 Service 的作用域是整个 NiFi 的任何位置,如果我们在某个 Group 内创建 Controller Service, 那么这个 Controller Service 仅在 Group 范围内可以被引用,NiFi 的这种机制也是方便 Service 的使用和维护。

在这里插入图片描述

2.全局参数配置

类似于数据库连接池、Kafka、Redis 等各种组件的连接池、客户端 Client 的 Service 在实际的使用中会非常多,由此配置的 Service 也会非常多,于是就会产生很多次的反复配置 URL、账号这一系列重复的内容,由于 NiFi 的特性,这些 Service 又和组件(Processor)一样,四散在各处,这就使得维护和运维管理变得很繁琐,调试、调整、查看的时候,要不停的各个 Group 来回跳转、调整不同的 Service 的 Configure;为应对此类问题,NiFi 提供了全局配置的机制来弥补。

使用变量前:

在这里插入图片描述
这里的 URLDriver Class NameDatabase User 在实际生产环境中,可能都是固定的数据库和固定的服务,几乎不需要变的,可能只需要配置一遍就好,不需要每次创建 Service 都写一遍;所以可以这里可以使用上下文变量(Parameter Context)。

首先,打开 Parameter Context,创新一组新的变量:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
再进入 CONTROLLER SERVICES 对 Service 的配置进行修改,将具体的 URLDriver Class NameDatabase User 等参数,全部使用变量替换(变量使用 # 符 )。

在这里插入图片描述

3.DBCPConnectionPool 的使用样例

下面将使用 NiFi 实现一个简单的 Demo,从 MySQL 数据库中读取部分数据,将数据进行筛选,然后将数据输出;

首先,使用 ExecuteSQL 组件,读取 MySQL 中的数据,根据上文描述,创建一个 DBCPConnectionPool 的 Service,然后启动:

在这里插入图片描述
添加 ExecuteSQL 组件,配置相关内容,根据自定义编写的 SQL 读取数据库内容:

在这里插入图片描述
随后添加 ConvertAvroToJSON 组件,这里从数据库读出的数据是不可读的,为了方便查看调试、同时也是为了后续使用 Groovy 处理数据,所以选择转换为 JSON 进行处理,实际使用可以根据自身情况选择转换器:

在这里插入图片描述
添加 ExecuteGroovyScript 组件,使用 Groovy 脚本对数据进行处理。

在这里插入图片描述
Groovy 的脚本内容如下:

import org.apache.commons.io.IOUtils;
import java.nio.charset.StandardCharsets;
import groovy.json.JsonBuilder;
import groovy.json.JsonOutput;
import groovy.json.JsonSlurper;
import groovy.json.StringEscapeUtils;
import java.util.*;

def dataJson = getInputJSONData()
if(null == dataJson){
    return;
}
def rss = []
for(int i = 0; i < dataJson.size();i++){
    def tem = dataJson.get(i);
    //在这里可以对数据进行处理
    rss.add(tem.name);
}

// 输出
if(rss.size()>0){
    sendData(rss,REL_SUCCESS);
}

/**
 * 读取输入流
 * @author GCC
 ***/
def getInputJSONData(){
    def flow = session.get()
    if(null == flow){
        log.error("the flow is null ...");
        return;
    }
    def dataJson = null;
    def jsonStr = "";
    session.read(flow,{
        inputStream ->
            jsonStr = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    } as InputStreamCallback);
    try{
        dataJson = new JsonSlurper().parseText(jsonStr);
    }catch(Exception e){
        log.error("输入流格式错误")
    }
    session.remove(flow);
    return dataJson;
}

/**
 *输出数据至后续管道
 *@param result 输出的数据
 *@param outStream 输出的管道
 *@author GCC
 ***/
void sendData(def result,def outStream){
    String successFlowFileStr = StringEscapeUtils.unescapeJavaScript(new JsonOutput().toJson(result).toString());
    def newflow = session.create();
    newflow = session.write(newflow, {
        outputStream ->
            outputStream.write(successFlowFileStr.getBytes(StandardCharsets.UTF_8))
    } as OutputStreamCallback)
    session.transfer(newflow, outStream);
}

最后使用 LogMessage 组件作为接收数据,实际情况可以将数据转为下一处理节点或存储等等。

在这里插入图片描述
在这里插入图片描述

4.在 ExcuseGroovyScript 组件中使用 Service

ExcuseGroovyScript 组件内部使用 Groovy 脚本处理数据时,可能需要再次读取数据库或者使用其他第三方数据来辅助处理,这时候,ExcuteGroovyScript 组件支持可以引入 Service,提供用户编写的 Groovy 脚本内部使用 Service;

首先需要在 ExcuteGroovyScript 组件的 PROPERTIES 配置中新增属性:

在这里插入图片描述
这里,添加属性时,会让用户输入用户给该属性的命名,如果是普通命名,这里的属性仅仅作为静态数据而已,但是如果使用关键字 SQL. 或者 CTL. 作为名称前缀时,则能够使用 Service,后续的属性值则会变成 Service 的选择。

在 Groovy 的代码中,则可以通过 SQL.mysql.{method} 的方式,调用 Service 的方法:

import org.apache.commons.io.IOUtils;
import java.nio.charset.StandardCharsets;
import groovy.json.JsonBuilder;
import groovy.json.JsonOutput;
import groovy.json.JsonSlurper;
import groovy.json.StringEscapeUtils;
import java.util.*;

def dataJson = getInputJSONData()
if(null == dataJson){
    return;
}
def rss = []
for(int i = 0; i < dataJson.size();i++){
    def tem = dataJson.get(i);
    def mapdic = [:]
    // 使用 Service 查询数据库
    SQL.mysql.eachRow("SELECT id, value FROM tb_dic_detail WHERE u_status = 1 "){
       row->
           mapdic.put(row.id.toString(),row.value.toString());    
    }
    rss.add(tem.name);
}

// 输出
if(rss.size()>0){
    sendData(rss,REL_SUCCESS);
}


/***************************************************公共函数***************************************************/

/**
 * 读取输入流
 * @author GCC
 ***/
def getInputJSONData(){
    def flow = session.get()
    if(null == flow){
        log.error("the flow is null ...");
        return;
    }
    def dataJson = null;
    def jsonStr = "";
    session.read(flow,{
        inputStream ->
            jsonStr = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    } as InputStreamCallback);
    try{
        dataJson = new JsonSlurper().parseText(jsonStr);
    }catch(Exception e){
        log.error("输入流格式错误")
    }
    session.remove(flow);
    return dataJson;
}

/**
 *输出数据至后续管道
 *@param result 输出的数据
 *@param outStream 输出的管道
 *@author GCC
 ***/
void sendData(def result,def outStream){
    String successFlowFileStr = StringEscapeUtils.unescapeJavaScript(new JsonOutput().toJson(result).toString());
    def newflow = session.create();
    newflow = session.write(newflow, {
        outputStream ->
            outputStream.write(successFlowFileStr.getBytes(StandardCharsets.UTF_8))
    } as OutputStreamCallback)
    session.transfer(newflow, outStream);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/265031.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【EasyExcel实践】万能导出,一个接口导出多张表以及任意字段(可指定字段顺序)-简化升级版

文章目录 前言正文一、项目简介二、核心代码2.1 pom.xml 依赖配置2.2 ExcelHeadMapFactory2.3 ExcelDataLinkedHashMap2.4 自定义注解 ExcelExportBean2.5 自定义注解 ExcelColumnTitle2.6 建造器接口 Builder2.7 表格工具类 ExcelUtils2.8 GsonUtil2.9 模版类 ExportDynamicCo…

【每日一题】得到山形数组的最少删除次数

文章目录 Tag题目来源解题思路方法一&#xff1a;最长递增子序列 写在最后 Tag 【最长递增子序列】【数组】【2023-12-22】 题目来源 1671. 得到山形数组的最少删除次数 解题思路 方法一&#xff1a;最长递增子序列 前后缀分解 根据前后缀思想&#xff0c;以 nums[i] 为山…

最新ChatGPT网站系统源码+AI绘画系统+支持GPT语音对话+详细图文搭建教程/支持GPT4.0/H5端系统/文档知识库

一、前言 SparkAi创作系统是基于ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统&#xff0c;支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;可以说SparkAi是目前国内一款的ChatGPT对接OpenAI软件系统。那么如何搭建部署AI创作Ch…

TYPE C 接口知识详解

1、Type C 概述 Type-C口有4对TX/RX分线&#xff0c;2对USBD/D-&#xff0c;一对SBU&#xff0c;2个CC&#xff0c;另外还有4个VBUS和4个地线。 当Type-C接口仅用作传输DP信号时&#xff0c;则可利用4对TX/RX&#xff0c;从而实现4Lane传输&#xff0c;这种模式称为DPonly模式…

C++ 检测 是不是 com组件 的办法 已解决

在日常开发中&#xff0c;遇到动态库和 com组件库的调用 无法区分。检测是否com组件的办法 在头部文件&#xff0c;引入文件 如果能编译成功说明是 com组件&#xff0c;至于动态库如何引入&#xff0c;还在观察中 #import "TerraExplorerX.dll" no_namespace, nam…

云原生之深入解析基于FunctionGraph在Serverless领域的FinOps的探索和实践

一、背景 Serverless 精确到毫秒级的按用付费模式使得用户不再需要为资源的空闲时间付费。然而&#xff0c;对于给定的某个应用函数&#xff0c;由于影响其计费成本的因素并不唯一&#xff0c;使得用户对函数运行期间的总计费进行精确的事先估计变成了一项困难的工作。以传统云…

TCP_滑动窗口介绍

简介 TCP协议中有两个窗口&#xff0c;滑动窗口和拥塞窗口&#xff0c;两者均是一种流控机制&#xff1b;滑动窗口是接收方的流控机制&#xff0c;拥塞窗口是发送方的流控机制。 本文介绍滑动窗口&#xff0c;接收方为TCP连接设置了接收缓存。当TCP连接接收到正确、按序的字节…

Mybatis3系列课程8-带参数查询

简介 上节课内容中讲解了查询全部, 不需要带条件查, 这节我们讲讲 带条件查询 目标 1. 带一个条件查询-基本数据类型 2.带两个条件查询-连个基本数据类型 3.带一个对象类型查询 为了实现目标, 我们要实现 按照主键 查询某个学生信息, 按照姓名和年级编号查询学生信息 按照学生…

MyBatis中延迟加载,全局和局部的开启使用与关闭

文章目录 MyBatis中延迟加载&#xff0c;全局和局部的开启使用与关闭1、问题提出2、延迟加载和立即加载延迟加载立即加载 3、三种对应的表关系中的加载4、打开全局延迟加载&#xff08;实现一对一的延迟加载&#xff09;5、实现一对多的延迟加载&#xff08;将上面设置的全局延…

渲染控制之条件渲染

目录 1、使用规则 2、更新机制 3、使用if进行条件渲染 4、if ... else ...语句和子组件状态 5、嵌套if语句 ArkTS提供了渲染控制的能力。条件渲染可根据应用的不同状态&#xff0c;使用if、else和else if渲染对应状态下的UI内容。 1、使用规则 支持if、else和else if语句…

pip 常用指令 pip list 命令用法介绍

&#x1f4d1;pip 常用命令归类整理 pip list 是一个用于列出已安装的 Python 包的命令。这个命令会显示出所有已安装的包&#xff0c;以及它们的版本号。 pip list 命令有以下参数 -o, --outdated&#xff1a;列出所有过时的包&#xff0c;即有新版本可用的包。-u, --uptod…

DPDK单步跟踪(3)-如何利用visual studio 2019和visual gdb来单步调试dpdk

准备工作 因为时间的关系&#xff0c;我想到哪说到哪&#xff0c;可能没那么高的完成度。 但其实有心的人&#xff0c;看到这个标题&#xff0c;就关了本文自己能做了。 why和how to build debug version DPDK,见前两篇。这里我们准备开始。 首先&#xff0c;你有一台linux机…

什么是“人机协同”机器学习?

“人机协同”&#xff08;HITL&#xff09;是人工智能的一个分支&#xff0c;它同时利用人类智能和机器智能来创建机器学习模型。在传统的“人机协同”方法中&#xff0c;人们会参与一个良性循环&#xff0c;在其中训练、调整和测试特定算法。通常&#xff0c;它的工作方式如下…

《软件方法(下)》8.2.4 类和属性的命名

DDD领域驱动设计批评文集 做强化自测题获得“软件方法建模师”称号 《软件方法》各章合集 8.2 建模步骤C-1 识别类和属性 8.2.4 类和属性的命名 8.2.4.2 关于DDD话语中的“通用语言” DDD&#xff08;领域驱动设计&#xff09;话语中有“通用语言&#xff08;Ubiquitous L…

【JAVA面试题】什么是代码单元?什么是码点?

&#x1f34e;个人博客&#xff1a;个人主页 &#x1f3c6;个人专栏&#xff1a; JAVA ⛳️ 功不唐捐&#xff0c;玉汝于成 目录 目录 前言 思路 代码单元&#xff08;Code Unit&#xff09;&#xff1a; 码点&#xff08;Code Point&#xff09;&#xff1a; 作…

vscode | python | remote-SSH | Debug 配置 + CLIP4Clip实验记录

安装Extension 本地安装Remote-SSH、python 远程服务器上安装Python 难点&#xff1a;主机和远程服务器上安装Python扩展失败&#xff0c;可能是网络、代理等原因导致解决方法&#xff1a; 主机在官方网站下载Python扩展&#xff1a;https://marketplace.visualstudio.com/it…

RobotFramework 自动化测试实战进阶篇

工具 Robotframework, 采用PO设计模式 PO模型 PO模型即Page Objects&#xff0c;直译意思就是“页面对象”&#xff0c;通俗的讲就是把一个页面&#xff0c;或者说把一个页面的某个区域当做一个对象&#xff0c;通过封装这个对象可以实现调用。 PO设计的好处 代码复用&…

【沁恒蓝牙mesh】CH58x DataFlash 详解

本文主要介绍了 沁恒蓝牙芯片 CH58x 的 DataFlash 分区以及读写操作以及原理 &#x1f4cb; 个人简介 &#x1f496; 作者简介&#xff1a;大家好&#xff0c;我是喜欢记录零碎知识点的小菜鸟。&#x1f60e;&#x1f4dd; 个人主页&#xff1a;欢迎访问我的 Ethernet_Comm 博…

P3375 【模板】KMP

【模板】KMP 题目描述 给出两个字符串 s 1 s_1 s1​ 和 s 2 s_2 s2​&#xff0c;若 s 1 s_1 s1​ 的区间 [ l , r ] [l, r] [l,r] 子串与 s 2 s_2 s2​ 完全相同&#xff0c;则称 s 2 s_2 s2​ 在 s 1 s_1 s1​ 中出现了&#xff0c;其出现位置为 l l l。 现在请你求…

链表常见题型(1)

1.反转链表 1.1反转链表 如果我们想要反转链表&#xff0c;那应该有head的next指针指向空&#xff0c;其余结点的next指针反过来&#xff0c;指向它的上一个结点&#xff0c;那我们在执行该操作的时候就需要定义变量cur(current)表示我们当前遍历到的结点&#xff0c;变量pre(…