【大数据】NiFi 中的 Controller Service

news2025/6/9 13:17:59

NiFi 中的 Controller Service

  • 1.Service 简介
    • 1.1 Controller Service 的配置
      • 1.1.1 SETTING 基础属性
      • 1.1.2 PROPERTIES 使用属性
      • 1.1.3 COMMENT 页签
    • 1.2 Service 的使用范围
  • 2.全局参数配置
  • 3.DBCPConnectionPool 的使用样例
  • 4.在 ExcuseGroovyScript 组件中使用 Service

1.Service 简介

首先 NiFi 中的 Controller Service 和我们 MVC 概念中的 Controller Service 不是一个概念,NiFi 中的 Controller Service 更像是和 Processor 同级的一个概念,它和 Processor 在我个人的使用经验来理解的话就是 它是预制好的各种服务,可以被 Processor 引用或者支撑 Processor,例如一个 SQL 读取的 Processor,它得需要 JDBC 的连接,才能访问数据库。这里 Controller Service 就可以是一个 JDBC 的连接池服务。

同理,Controller Service 也是支持扩展的,可以像自定义开发 Processor 一样,根据自己的业务需求,进行自定义的 Controller Service 开发。

当我们使用某些依赖 Service 的组件(Processor)时,在配置中会出现选择 Service 或者创建新的 Service 的情况,这里的 Service 即是 NiFi 的 Controller Service,一旦创建新的,则会生成一个以 Group 为范围的 “全局” Service 对象,这时,再有依赖同类型 Service 的 Processor 时,可以直接选中。

在这里插入图片描述
在这里插入图片描述

1.1 Controller Service 的配置

单独查看 Controller Service 可以从面板空白处,右键 Configure 来看,如下图:

在这里插入图片描述
这是一个 JDBC 的连接池 Service,它包含的属性有 名称类型简介启用状态操作;从操作中可以看到配置该 Service 需要填写基本的各类属性;其中,Service 是有启停状态的,如果想修改 Service 的属性内容,必须先保证该 Service 是停用状态,然后点击配置标识,则进入配置页面,它的配置和 Processor 的差不多,通过页签区别,共有三个页签:SETTING基础属性)、PROPERTIES使用属性)、COMMENT页签)。

1.1.1 SETTING 基础属性

基础属性,包含左侧的名称,名称可以进行更改,右侧包含引用此 Service 的 Processor 列表。

1.1.2 PROPERTIES 使用属性

在这里插入图片描述
核心的业务配置,此标签页的配置项根据不同的 Service,配置内容不一致,具体的配置项以及使用,可以参考官方的文档;这里的是 JDBC 的连接池,所以基本需要连接数据库所需的 URL、数据库的账号密码、数据库的驱动类名称、驱动类的依赖 jar 包路径 ,这里不少 Service 可能都需要第三方的 jar 包依赖才可以使用,长期使用或生产环境下,建议将所有 jar 资源集中放在统一路径下。

1.1.3 COMMENT 页签

在这里插入图片描述
一个提供 Service 使用说明的页签,可根据自己实际需求,补充使用 Service 的用法以及描述。

1.2 Service 的使用范围

在 NiFi 中,Group 同时也对 Service 起作用,如果我们在一个 NiFi 的最外层的平面上新增 Controller Service,那么这些 Service 的作用域是整个 NiFi 的任何位置,如果我们在某个 Group 内创建 Controller Service, 那么这个 Controller Service 仅在 Group 范围内可以被引用,NiFi 的这种机制也是方便 Service 的使用和维护。

在这里插入图片描述

2.全局参数配置

类似于数据库连接池、Kafka、Redis 等各种组件的连接池、客户端 Client 的 Service 在实际的使用中会非常多,由此配置的 Service 也会非常多,于是就会产生很多次的反复配置 URL、账号这一系列重复的内容,由于 NiFi 的特性,这些 Service 又和组件(Processor)一样,四散在各处,这就使得维护和运维管理变得很繁琐,调试、调整、查看的时候,要不停的各个 Group 来回跳转、调整不同的 Service 的 Configure;为应对此类问题,NiFi 提供了全局配置的机制来弥补。

使用变量前:

在这里插入图片描述
这里的 URLDriver Class NameDatabase User 在实际生产环境中,可能都是固定的数据库和固定的服务,几乎不需要变的,可能只需要配置一遍就好,不需要每次创建 Service 都写一遍;所以可以这里可以使用上下文变量(Parameter Context)。

首先,打开 Parameter Context,创新一组新的变量:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
再进入 CONTROLLER SERVICES 对 Service 的配置进行修改,将具体的 URLDriver Class NameDatabase User 等参数,全部使用变量替换(变量使用 # 符 )。

在这里插入图片描述

3.DBCPConnectionPool 的使用样例

下面将使用 NiFi 实现一个简单的 Demo,从 MySQL 数据库中读取部分数据,将数据进行筛选,然后将数据输出;

首先,使用 ExecuteSQL 组件,读取 MySQL 中的数据,根据上文描述,创建一个 DBCPConnectionPool 的 Service,然后启动:

在这里插入图片描述
添加 ExecuteSQL 组件,配置相关内容,根据自定义编写的 SQL 读取数据库内容:

在这里插入图片描述
随后添加 ConvertAvroToJSON 组件,这里从数据库读出的数据是不可读的,为了方便查看调试、同时也是为了后续使用 Groovy 处理数据,所以选择转换为 JSON 进行处理,实际使用可以根据自身情况选择转换器:

在这里插入图片描述
添加 ExecuteGroovyScript 组件,使用 Groovy 脚本对数据进行处理。

在这里插入图片描述
Groovy 的脚本内容如下:

import org.apache.commons.io.IOUtils;
import java.nio.charset.StandardCharsets;
import groovy.json.JsonBuilder;
import groovy.json.JsonOutput;
import groovy.json.JsonSlurper;
import groovy.json.StringEscapeUtils;
import java.util.*;

def dataJson = getInputJSONData()
if(null == dataJson){
    return;
}
def rss = []
for(int i = 0; i < dataJson.size();i++){
    def tem = dataJson.get(i);
    //在这里可以对数据进行处理
    rss.add(tem.name);
}

// 输出
if(rss.size()>0){
    sendData(rss,REL_SUCCESS);
}

/**
 * 读取输入流
 * @author GCC
 ***/
def getInputJSONData(){
    def flow = session.get()
    if(null == flow){
        log.error("the flow is null ...");
        return;
    }
    def dataJson = null;
    def jsonStr = "";
    session.read(flow,{
        inputStream ->
            jsonStr = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    } as InputStreamCallback);
    try{
        dataJson = new JsonSlurper().parseText(jsonStr);
    }catch(Exception e){
        log.error("输入流格式错误")
    }
    session.remove(flow);
    return dataJson;
}

/**
 *输出数据至后续管道
 *@param result 输出的数据
 *@param outStream 输出的管道
 *@author GCC
 ***/
void sendData(def result,def outStream){
    String successFlowFileStr = StringEscapeUtils.unescapeJavaScript(new JsonOutput().toJson(result).toString());
    def newflow = session.create();
    newflow = session.write(newflow, {
        outputStream ->
            outputStream.write(successFlowFileStr.getBytes(StandardCharsets.UTF_8))
    } as OutputStreamCallback)
    session.transfer(newflow, outStream);
}

最后使用 LogMessage 组件作为接收数据,实际情况可以将数据转为下一处理节点或存储等等。

在这里插入图片描述
在这里插入图片描述

4.在 ExcuseGroovyScript 组件中使用 Service

ExcuseGroovyScript 组件内部使用 Groovy 脚本处理数据时,可能需要再次读取数据库或者使用其他第三方数据来辅助处理,这时候,ExcuteGroovyScript 组件支持可以引入 Service,提供用户编写的 Groovy 脚本内部使用 Service;

首先需要在 ExcuteGroovyScript 组件的 PROPERTIES 配置中新增属性:

在这里插入图片描述
这里,添加属性时,会让用户输入用户给该属性的命名,如果是普通命名,这里的属性仅仅作为静态数据而已,但是如果使用关键字 SQL. 或者 CTL. 作为名称前缀时,则能够使用 Service,后续的属性值则会变成 Service 的选择。

在 Groovy 的代码中,则可以通过 SQL.mysql.{method} 的方式,调用 Service 的方法:

import org.apache.commons.io.IOUtils;
import java.nio.charset.StandardCharsets;
import groovy.json.JsonBuilder;
import groovy.json.JsonOutput;
import groovy.json.JsonSlurper;
import groovy.json.StringEscapeUtils;
import java.util.*;

def dataJson = getInputJSONData()
if(null == dataJson){
    return;
}
def rss = []
for(int i = 0; i < dataJson.size();i++){
    def tem = dataJson.get(i);
    def mapdic = [:]
    // 使用 Service 查询数据库
    SQL.mysql.eachRow("SELECT id, value FROM tb_dic_detail WHERE u_status = 1 "){
       row->
           mapdic.put(row.id.toString(),row.value.toString());    
    }
    rss.add(tem.name);
}

// 输出
if(rss.size()>0){
    sendData(rss,REL_SUCCESS);
}


/***************************************************公共函数***************************************************/

/**
 * 读取输入流
 * @author GCC
 ***/
def getInputJSONData(){
    def flow = session.get()
    if(null == flow){
        log.error("the flow is null ...");
        return;
    }
    def dataJson = null;
    def jsonStr = "";
    session.read(flow,{
        inputStream ->
            jsonStr = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
    } as InputStreamCallback);
    try{
        dataJson = new JsonSlurper().parseText(jsonStr);
    }catch(Exception e){
        log.error("输入流格式错误")
    }
    session.remove(flow);
    return dataJson;
}

/**
 *输出数据至后续管道
 *@param result 输出的数据
 *@param outStream 输出的管道
 *@author GCC
 ***/
void sendData(def result,def outStream){
    String successFlowFileStr = StringEscapeUtils.unescapeJavaScript(new JsonOutput().toJson(result).toString());
    def newflow = session.create();
    newflow = session.write(newflow, {
        outputStream ->
            outputStream.write(successFlowFileStr.getBytes(StandardCharsets.UTF_8))
    } as OutputStreamCallback)
    session.transfer(newflow, outStream);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1330753.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

面试题:什么是脚手架?为什么需要脚手架?常用的脚手架有哪些?

文章目录 前言脚手架介绍什么是脚手架 为什么需要脚手架不要重新造轮子 常用脚手架Vue框架MavenNettyJava EE 前言 微服务本身是一种架构风格&#xff0c;也是指导组织构建软件的一系列最佳实践集合。然而&#xff0c;业务团队在拆分应用后&#xff0c;会产生更多细粒度服务&a…

KingbaseV8R6单实例定时全量备份步骤

此场景为单机数据库节点内部备份&#xff0c;方便部署和操作&#xff0c;但备份REPO与数据库实例处于同一个物理主机&#xff0c;冗余度较低。 前期准备 配置ksql免密登录(必须) 在Kingbase数据库运行维护中&#xff0c;经常用到ksql工具登录数据库&#xff0c;本地免密登录…

@z-utils组 重构和自动化实现

highlight: monokai theme: github 包简介 z-utils组 是一个可以在vue/react/pure js 中使用的工具包&#xff0c;它包含三个子类&#xff0c;分别为 z-utils/base, z-utils/react, z-utils/vue 三个分别在不同区域使用。 他是原 zzy-javascript-devtools 的重构版本&#xf…

2016年第五届数学建模国际赛小美赛A题臭氧消耗预测解题全过程文档及程序

2016年第五届数学建模国际赛小美赛 A题 臭氧消耗预测 原题再现&#xff1a; 臭氧消耗包括自1970年代后期以来观察到的若干现象&#xff1a;地球平流层&#xff08;臭氧层&#xff09;臭氧总量稳步下降&#xff0c;以及地球极地附近平流层臭氧&#xff08;称为臭氧空洞&#x…

HarmonyOS 多态样式

还记得我们css中有 按压 失去焦点 点击后 正常状态 的各种样式设置 那么作为前端开发 TS JS的改版 harmonyos自然也有 这里 我们编写代码如下 Entry Component struct Index {build() {Row() {Column() {TextInput()TextInput().stateStyles({//正常状态normal: {.background…

WEB渗透—PHP反序列化(八)

Web渗透—PHP反序列化 课程学习分享&#xff08;课程非本人制作&#xff0c;仅提供学习分享&#xff09; 靶场下载地址&#xff1a;GitHub - mcc0624/php_ser_Class: php反序列化靶场课程&#xff0c;基于课程制作的靶场 课程地址&#xff1a;PHP反序列化漏洞学习_哔哩…

20231223使用Rockchip原厂的Android11调通Firefly的AIO-3399J开发板上的AP6356S

20231223使用Rockchip原厂的Android11调通Firefly的AIO-3399J开发板上的AP6356S 2023/12/23 14:14 开发板&#xff1a;Firefly的AIO-3399J【RK3399】 SDK&#xff1a;rk3399-android-11-r20211216.tar.xz【Android11】 Android11.0.tar.bz2.aa【ToyBrick】 Android11.0.tar.bz2…

Unity3D移动端实现摇一摇功能

手机摇一摇功能在平时项目开发中是很常见的需求&#xff0c;利用Unity的重力感应可以很方便的实现该功能。 Unity简化了重力感应的开发&#xff0c; 通过访问Input.acceleration属性&#xff0c;取回加速度传感器的值。首先我们看一下重力传感器的方向问题。Unity3D中重量的取…

MFC使用高速绘图控件high-speed Charting Control绘制柱形图

1. 创建MFC单文档工程BarChartDemo。 2. 在工程文件夹下新建文件夹ChartCtrl,将ChartCtrl源码放入,如下图所示。在工程中添加这些项,项目——添加——现有项,全部添加。 3. 添加一个对话框,ID为IDD_DLG_BAR,类名为CBarDlg。 4. 在对话框中添加Custom Control控件,将控…

机场信息集成系统系列介绍(7):机场航班信息显示系统FIDS

目录 一、简介 二、架构及相关功能 1、实时更新和显示航班信息 2、多屏显示与查询 3、提供登机口导航信息 4、发布机场公告 5、集成机场的其他延伸服务 6、支持多语言显示 7、监控与故障处理 8、数据分析与优化 9、与航空公司、地面代理的信息交互 10、安全保障与应…

Python数据科学视频讲解:嵌入法(随机森林、提升法、Logistic等)

4.5 嵌入法&#xff08;随机森林、提升法、Logistic等&#xff09; 视频为《Python数据科学应用从入门到精通》张甜 杨维忠 清华大学出版社一书的随书赠送视频讲解4.5节内容。本书已正式出版上市&#xff0c;当当、京东、淘宝等平台热销中&#xff0c;搜索书名即可。内容涵盖数…

【Hive_04】分区分桶表以及文件格式

1、分区表1.1 分区表基本语法&#xff08;1&#xff09;创建分区表&#xff08;2&#xff09;分区表读写数据&#xff08;3&#xff09;分区表基本操作 1.2 二级分区1.3 动态分区 2、分桶表2.1 分桶表的基本语法2.2 分桶排序表 3、文件格式与压缩3.1 Hadoop压缩概述3.2 Hive文件…

安捷伦Agilent 8720ES网络分析仪

Agilent安捷伦8720ES S-参数矢量网络分析仪 50MHz至20GHz 100 dB 的动态范围 优异的测量精度 2个测量通道 4个显示通道 频率和功率扫描 快扫描和数据传输速度 通过/失败测试&#xff0c;强大的标记功能 电校准&#xff08;ECal&#xff09; 内部使用测试序列的自动化 可选时域…

芯片到底是怎么访问外设

微型计算机的组成&#xff1a;CPURAM硬盘等 什么是FLASH&#xff1f; FLASH存储器又称闪存&#xff0c;它结合了ROM和RAM的长处&#xff0c;不仅具备电子可擦除可编程&#xff08;EEPROM&#xff09;的性能&#xff0c;还不会断电丢失数据同时可以快速读取数据&#xff08;NV…

matlab时间转换

采集的GNSS数据是10hz的。 data&#xff08;选取其中一部分&#xff09;如下&#xff1a; &#xff08;1&#xff09;char类型 formatOut yyyy-mm-dd HH:MM:SS; str datestr(data,formatOut); str如下&#xff1a; &#xff08;2&#xff09;double类型 DateVector dat…

Ai基本视图操作和快捷键设置

这个抓手可以用来拖动&#xff0c;左右的滑块可以用来实现上下左右的移动。 对于放大缩小图片有ctrl与ctrl-&#xff08;Alt滚轮&#xff09;如果要回到原来的大小则使ctrl1 Ai还具有像ppt一样的放映功能&#xff08;可以将工具栏或者其他栏的根据进行替换&#xff09;

使用html+css+js+three.js写圣诞树

实现效果&#xff1a; <head><meta charset"UTF-8"><title>Musical Christmas Lights</title><link rel"stylesheet" href"https://cdnjs.cloudflare.com/ajax/libs/normalize/5.0.0/normalize.min.css"><sty…

盲盒小程序搭建:开启互联网盲盒时代

盲盒目前是一个非常火爆的商业模式。随着科技的发展&#xff0c;盲盒市场也开始采用线上盲盒进行拓客&#xff0c;吸引盲盒爱好者。当下在互联网电商影响下&#xff0c;盲盒小程序逐渐受到了商家的青睐。 线上盲盒市场 盲盒消费主要是根据自身的未知性吸引消费者&#xff0c;消…

小白实战教学:开发同城外卖跑腿APP

本文将以"小白实战教学"为主题&#xff0c;向大家介绍如何从零开始&#xff0c;开发一款简单而实用的同城外卖跑腿APP。 一、准备工作 在开始之前&#xff0c;我们需要做一些准备工作。首先&#xff0c;确保你已经安装好了开发环境&#xff0c;包括合适的集成开发环…

09.list 容器

9、list 容器 功能&#xff1a; 将数据进行链式存储 链表&#xff08;list&#xff09;是一种物理存储单元上非连续的存储结构&#xff0c;数据元素的逻辑顺序是通过链表中的指针链接实现的 链表的组成&#xff1a; 链表由一系列结点组成 结点的组成&#xff1a; 一个是存…