Nifi中的Controller Service

Service简介

首先Nifi中的Controller Service 和我们MVC概念中的Controller Service不是一个概念,Nifi中的Controller Service更像是和Processor同级的一个概念,它和Processor在我个人的使用经验来理解的话就是它是预制好的各种服务,可以被Processor引用或者支撑Processor,例如一个SQL读取的Processor,它得需要JDBC的连接,才能访问数据库。这里Controller Service 就可以是一个JDBC的连接池服务。

同理,Controller Service 也是支持扩展的,可以像自定义开发Processor一样,根据自己的业务需求,进行自定义的Controller Service 开发。

当我们使用某些依赖Service的组件(Processor)时,在配置中会出现选择Service或者创建新的Service的情况,这里的Service即是Nifi的Controller Service,一旦创建新的,则会生成一个以Group为范围的 “全局” Service对象,这时,再有依赖同类型Service的Processor时,可以直接选中:

 

 Controller Service的配置

单独查看Controller Service 可以从面板空白处,右键Configure来看,如下图:

 这是一个JDBC的连接池Service,它包含的属性有名称、类型、简介、启用状态、操作;从操作中可以看到配置该Service需要填写基本的各类属性;其中,Service是有启停状态的,如果想修改Service的属性内容,必须先保证该Service是停用状态,然后点击配置标识,则进入配置页面,它的配置和Processor的差不多,通过页签区别,共有三个页签:SETTING(基础属性)、PROPERTIES(使用属性)、COMMENT(页签):

SETTING 基础属性

 基础属性,包含左侧的名称,名称可以进行更改,右侧包含引用此Service的Processor 列表

PROPERTIES 使用属性

核心的业务配置,此标签页的配置项根据不同的Service,配置内容不一致,具体的配置项以及使用,可以参考官方的文档;这里的是JDBC的连接池,所以基本需要连接数据库所需的URL、数据库的账号密码、数据库的驱动类名称、驱动类的依赖 jar包路径 ,这里不少Service可能都需要第三方的jar包依赖才可以使用,长期使用或生产环境下,建议将所有jar资源集中放在统一路径下。

COMMENT 页签 

一个提供Service使用说明的页签,可根据自己实际需求,补充使用Service的用法以及描述

Service 的使用范围

 在Nifi的基本使用中的Group的使用介绍,Group同时也对Services起作用,如果我们在一个Nifi的最外层的平面上 新增Controller Service,那么这些Service的作用域是整个Nifi的任何位置,如果我们在某个Group内创建Controller Service, 那么这个Controller Service 仅在Group范围内可以被引用,Nifi的这种机制也是方便Service的使用和维护

 

全局参数配置

类似于 数据库连接池、Kafka、Redis等各种组件的连接池、客户端Client的Service在实际的使用中会非常多,由此配置的Service也会非常多,于是就会产生很多次的反复配置URL、账号这一系列重复的内容,由于Nifi的特性,这些Service又和组件(Processor)一样,四散在各处,这就使得维护和运维管理变得很繁琐,调试、调整、查看的时候,要不停的各个group来回跳转、调整不同的Service的Configure;为应对此类问题,Nifi 提供了全局配置的机制来弥补。

 使用变量前:

这里的 URL、Driver Class Name、Database User在实际生产环境中,可能都是固定的数据库和固定的服务,几乎不需要变得,可能只需要配置一遍就好,不需要每次创建Service都写一遍;所以可以这里可以使用上下文变量(Parameter Context)

首先,打开Parameter Context,创新一组新的变量:

 

 

 

 

之后进入Service 的管控面板(空白处右键选择Configure),先选择变量组:

 

再进入 CONTROLLER SERVICES 对Service的配置进行修改,将具体的RUL、Driver-name、user等参数,全部使用变量替换(变量使用‘#’符 )

 

DBCPConnectionPool的使用样例

下面将使用Nifi 实现一个简单的Demo,从Mysql数据库中读取部分数据,将数据进行筛选,然后将数据输出;

首先,使用ExecuteSQL组件,读取Mysql中的数据,根据上文描述,创建一个DBCPConnectionPool 的Service,然后启动 :

 添加 ExecuteSQL组件,配置相关内容,根据自定义编写的SQL读取数据库内容:

随后添加 ConvertAvroToJSON 组件,这里从数据库读出的数据是不可读的,为了方便查看调试、同时也是为了后续使用groovy处理数据,所以选择转换为JSON进行处理,实际使用可以根据自身情况选择转换器:

添加 ExecuteGroovyScript 组件,使用groovy脚本对数据进行处理,groovy的脚本内容如下: 

 

groovy内容:

import org.apache.commons.io.IOUtils;
import java.nio.charset.StandardCharsets;
import groovy.json.JsonBuilder;
import groovy.json.JsonOutput;
import groovy.json.JsonSlurper;
import groovy.json.StringEscapeUtils;
import java.util.*;



def dataJson = getInputJSONData()
if(null == dataJson){
    return;
}
def rss = []
for(int i = 0; i < dataJson.size();i++){
    def tem = dataJson.get(i);
   //在这里可以对数据进行处理
    rss.add(tem.name);
}

// 输出
if(rss.size()>0){
    sendData(rss,REL_SUCCESS);
}





/**
 * 读取输入流
 * @author GCC
 ***/
def getInputJSONData(){
    def flow = session.get()
    if(null == flow){
        log.error("the flow is null ...");
        return;
    }
    def dataJson = null;
    def jsonStr = "";
    session.read(flow,{
        inputStream ->
            jsonStr = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    } as InputStreamCallback);
    try{
        dataJson = new JsonSlurper().parseText(jsonStr);
    }catch(Exception e){
        log.error("输入流格式错误")
    }
    session.remove(flow);
    return dataJson;
}
/**
 *输出数据至后续管道
 *@param result 输出的数据
 *@param outStream 输出的管道
 *@author GCC
 ***/
void sendData(def result,def outStream){
    String successFlowFileStr =StringEscapeUtils.unescapeJavaScript(new JsonOutput().toJson(result).toString());
    def newflow = session.create();
    newflow = session.write(newflow, {
        outputStream ->
            outputStream.write(successFlowFileStr.getBytes(StandardCharsets.UTF_8))
    } as OutputStreamCallback)
    session.transfer(newflow, outStream);
}

 最后使用LogMessage组件作为接收数据,实际情况可以将数据转为下一处理节点或存储等等

 

在ExcuseGroovyScript组件中使用Service

 在 ExcuseGroovyScript 组件内部使用groovy脚本处理数据时,可能需要再次读取数据库或者使用其他第三方数据来辅助处理,这时候,ExcuteGroovyScript组件支持可以引入Service,提供用户编写的groovy脚本内部使用Service;

首先需要在ExcuteGroovyScript组件的PROPERTIES  配置中新增属性:

 

这里,添加属性时,会让用户输入用户给该属性的命名,如果是普通命名,这里的属性仅仅作为静态数据而已,但是如果使用关键字 ‘SQL.’ 或者 'CTL.'作为名称前缀时,则能够使用Service,后续的属性值则会变成Service的选择。

在groovy的代码中,则可以通过 SQL.mysql.{method}的方式,调用Service的方法,在ExcuseScript组件中配合脚本语言进行数据的操作:

import org.apache.commons.io.IOUtils;
import java.nio.charset.StandardCharsets;
import groovy.json.JsonBuilder;
import groovy.json.JsonOutput;
import groovy.json.JsonSlurper;
import groovy.json.StringEscapeUtils;
import java.util.*;



def dataJson = getInputJSONData()
if(null == dataJson){
    return;
}
def rss = []
for(int i = 0; i < dataJson.size();i++){
    def tem = dataJson.get(i);
    def mapdic = [:]
    //使用Service查询数据库
    SQL.mysql.eachRow("SELECT id,value FROM tb_dic_detail WHERE u_status = 1 "){
       row->
           mapdic.put(row.id.toString(),row.value.toString());    }
    
    rss.add(tem.name);
}

// 输出
if(rss.size()>0){
    sendData(rss,REL_SUCCESS);
}



/*****************************************************************公共函数*********************************************************************/

/**
 * 读取输入流
 * @author GCC
 ***/
def getInputJSONData(){
    def flow = session.get()
    if(null == flow){
        log.error("the flow is null ...");
        return;
    }
    def dataJson = null;
    def jsonStr = "";
    session.read(flow,{
        inputStream ->
            jsonStr = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    } as InputStreamCallback);
    try{
        dataJson = new JsonSlurper().parseText(jsonStr);
    }catch(Exception e){
        log.error("输入流格式错误")
    }
    session.remove(flow);
    return dataJson;
}
/**
 *输出数据至后续管道
 *@param result 输出的数据
 *@param outStream 输出的管道
 *@author GCC
 ***/
void sendData(def result,def outStream){
    String successFlowFileStr =StringEscapeUtils.unescapeJavaScript(new JsonOutput().toJson(result).toString());
    def newflow = session.create();
    newflow = session.write(newflow, {
        outputStream ->
            outputStream.write(successFlowFileStr.getBytes(StandardCharsets.UTF_8))
    } as OutputStreamCallback)
    session.transfer(newflow, outStream);
}

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/794130.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

告别中央服务器:Syncthing实现点对点文件同步

介绍 Syncthing 是一款开源的文件同步工具&#xff0c;可让您在多个设备之间同步文件。 它适用于 Mac OS X、Windows、Linux、FreeBSD、Solaris、OpenBSD等系统。 可以通过浏览器访问来配置和监控该应用程序。 Syncthing 具有以下特点: 1、点对点同步 2、无需中央服务器 …

Python酷库之旅-第三方库Pandas(018)

目录 一、用法精讲 44、pandas.crosstab函数 44-1、语法 44-2、参数 44-3、功能 44-4、返回值 44-5、说明 44-6、用法 44-6-1、数据准备 44-6-2、代码示例 44-6-3、结果输出 45、pandas.cut函数 45-1、语法 45-2、参数 45-3、功能 45-4、返回值 45-5、说明 4…

11-《风信子》

风信子 风信子&#xff08;学名&#xff1a;Hyacinthus orientalis L.&#xff09;&#xff1a;是多年草本球根类植物&#xff0c;鳞茎卵形&#xff0c;有膜质外皮&#xff0c;皮膜颜色与花色成正相关&#xff0c;未开花时形如大蒜&#xff0c;原产地中海沿岸及小亚细亚一带&am…

从人工巡检到智能预警:视频AI智能监控技术在水库/河湖/水利防汛抗洪中的应用

一、背景需求分析 近日&#xff0c;我国多省市遭遇连日暴雨&#xff0c;导致水库、湖泊、河道等水域水位暴涨&#xff0c;城市内涝频发。随着夏季汛期的到来&#xff0c;降雨天气频繁&#xff0c;水利安全管理面临严峻挑战。为保障水库安全、预防和减少洪涝灾害&#xff0c;采…

java中Error与Exception的区别

java中Error与Exception的区别 1、错误&#xff08;Error&#xff09;1.1 示例 2、 异常&#xff08;Exception&#xff09;2.1 示例 3、 区别总结 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 当我们谈论编程中的错误&#xff08;Error&…

【zabbix7】开启HTTP authentication实现单点登录

开启HTTP authentication实现单点登录 一、新建http验证用户 htpasswd -c /etc/nginx/.htpasswd another_username # 在提示中输入密码二、新建Nginx配置文件 把zabbix.conf拷贝一份&#xff0c;然后修改listen监听的端口。 cp zabbx.conf zabbix_http.conf 每个location中新…

出现 failed to remove xxxx: Invalid argument 解决方法

目录 前言1. 问题所示2. 原理分析3. 解决方法 前言 这好像是一个Git的一个Bug&#xff0c;对应有个下下策的解决方式 1. 问题所示 Git提交的时候出现如下问题 Git warning:failed to remove debug.log:invalid argumentgit clean -f -1 --F&#xff1a;\xxx failed to rem…

YOLOv10改进 | 主干/Backbone篇 | 轻量级网络ShuffleNetV1(附代码+修改教程)

一、本文内容 本文给大家带来的改进内容是ShuffleNetV1&#xff0c;这是一种为移动设备设计的高效CNN架构。它通过使用点群卷积和通道混洗等操作&#xff0c;减少了计算成本&#xff0c;同时保持了准确性&#xff0c;通过这些技术&#xff0c;ShuffleNet在降低计算复杂度的同时…

初始网络知识

前言&#x1f440;~ 上一章我们介绍了使用java代码操作文件&#xff0c;今天我们来聊聊网络的一些基础知识点&#xff0c;以便后续更深入的了解网络 网络 局域网&#xff08;LAN&#xff09; 广域网&#xff08;WAN&#xff09; 路由器 交换机 网络通信基础 IP地址 端…

数据结构 —— FloydWarshall算法

数据结构 —— FloydWarshall算法 FloydWarshall算法三种最短路径算法比较1. Dijkstra算法2. Bellman-Ford算法3. Floyd-Warshall算法总结 我们之前介绍的两种最短路径算法都是单源最短路径&#xff0c;就是我们要指定一个起点来寻找最短路径&#xff0c;而我们今天介绍的Floyd…

YOLOv10改进 | Conv篇 | RCS-OSA替换C2f实现暴力涨点(减少通道的空间对象注意力机制)

一、本文介绍 本文给大家带来的改进机制是RCS-YOLO提出的RCS-OSA模块&#xff0c;其全称是"Reduced Channel Spatial Object Attention"&#xff0c;意即"减少通道的空间对象注意力"。这个模块的主要功能是通过减少特征图的通道数量&#xff0c;同时关注空…

MySQL实战45讲学习笔记(持续更新ing……)

文章目录 一、基础架构&#xff1a;一条SQL查询语句是如何执行的&#xff1f;概览连接器查询缓存分析器优化器执行器 二、日志系统&#xff1a;一条SQL更新语句是如何执行的&#xff1f;redo logbinlog两阶段提交 一、基础架构&#xff1a;一条SQL查询语句是如何执行的&#xf…

动态规划之数字三角形模型+最长上升子序列模型

首先&#xff0c;我们从集合角度重新看待DP&#xff1a; 直接看题&#xff1a;https://www.acwing.com/problem/content/1029/ 就是取纸条的原题&#xff0c;我们令f[i1,j1,i2,j2]表示从(1,1),(1,1)分别走到(i1,j1),(i2,j2)的路径的max i1j1i2j2&#xff0c;于是我们可以把状…

ArrayList----源码分析

源码中的简介&#xff1a; List接口的可调整数组实现。实现所有可选列表操作&#xff0c;并允许所有元素&#xff0c;包括null。除了实现List接口之外&#xff0c;这个类还提供了一些方法来操作内部用于存储列表的数组的大小。(这个类大致相当于Vector&#xff0c;只是它是不同…

广电日志分析系统

需求 广电集团中有若干个系统都产生日志信息&#xff0c;目前大约分布与70到80台服务器中&#xff0c;分别是windows与Linux操作系统。需要将服务器上产生的日志文件利用我们的技术进行解析 设计 每个日志工作站负责30-50个服务器的日志解析工作。可以根据实际需求进行设置&…

ESP32CAM物联网教学11

ESP32CAM物联网教学11 霍霍webserver 在第八课的时候&#xff0c;小智把乐鑫公司提供的官方示例程序CameraWebServer改成了明码&#xff0c;这样说明这个官方程序也是可以更改的嘛。这个官方程序有四个文件&#xff0c;一共3500行代码&#xff0c;看着都头晕&#xff0c;小智决…

基于Python+Flask+MySQL的新冠疫情可视化系统

基于PythonFlaskMySQL的新冠疫情可视化系统 FlaskMySQL 基于PythonFlaskMySQL的新冠疫情可视化系统 项目主要依赖前端&#xff1a;layui&#xff0c;Echart&#xff0c;后端主要是Flask&#xff0c;系统的主要支持登录注册&#xff0c;Ecahrt构建可视化图&#xff0c;可更换主…

Oracle序列迁移重建

原因&#xff1a;oracle数据导入后序列不一致 解决办法&#xff1a;从原库中导出一份最新的序列号&#xff0c;在目标库中导入 1.删除目标库该用户下的所有索引 select DROP SEQUENCE ||sequence_name || ; from dba_sequences where sequence_owner xxxxx;2.查询出所有序列…

edge 学习工具包 math solver

简介 推荐微软推出的学习工具中的两项工具&#xff1a;数学求解器和 pdf 阅读器。 打开 edge 学习工具包的方法 &#xff1a;右上角三点-更多工具-学习工具包。 math solver 除了基础的计算求解外&#xff0c;还用图标展示公式&#xff0c;清晰直观。 地址&#xff1a;求解…

《C语言程序设计 第4版》笔记和代码 第十一章 指针和数组

第十一章 指针和数组 11.1 指针和一维数组间的关系 1 由于数组名代表数组元素的连续存储空间的首地址&#xff0c;因此&#xff0c;数组元素既可以用下标法也可以用指针来引用。 例11.1见文末 2 p1与p在本质上是两个不同的操作&#xff0c;前者不改变当前指针的指向&#xf…