SmartETL函数式组件的设计与应用

news2025/5/19 12:42:28

SmartETL框架主要采用了面向对象的设计思想,将ETL过程中的处理逻辑抽象为LoaderProcessor(对应loader模块和iterator模块),所有流程组件需要继承或实现DataProvider(iter方法)或JsonIteratoron_data__process__方法)。

例如以下代码实现将论文结构中的摘要和正文拼接为一个字符串字段,方便后续对论文建立全文索引。

class ConcatPaperContent(JsonIterator):
    """
    arxiv html页面数据处理类
    """

    def on_data(self, data: Any, *args):
        paper = data['paper']
        content = ""
        if paper:
            abstract = paper.get('abstract')
            content += f"{abstract}\n"
            sections = paper.get('sections')
            for section in sections:
                content += f"{section['content']}\n"
        data['content'] = content
        return data

然而,业务中很多处理逻辑比较简单,以往开发时用少数几行代码就可以搞定,而在SmartETL框架中,则必须实现一个类,正如上面的例子所示。虽然SmartETL支持加载外部包的组件(只要在sys.path中),但如果是需要定制开发则相对繁琐。

此前,在过滤组件(Filter)中考虑到这种情况,解决办法是在流程中定义Lambda表达式。例如以下流程定义中,filter节点通过Lambda表达式abnormal_time实现过滤publish_time字段值小于当前时间的记录的功能,即,对于经过filter节点的记录,仅当其publish_time字段值大于等于当前时间current时才会输出给后续节点。

nodes:
  current: util.dates.current_ts(True)
  abnormal_time: "=lambda t, current=current: t >= current "
  filter: Filter(abnormal_time, key='publish_time')

为了简化业务代码编写,SmartETL新增实现函数式组件,即以函数形式提供核心处理逻辑,而不需要封装成类。Lambda表达式就是一种特殊的函数。

跟C/C++、Java不同,Python语言中函数是一等公民,即开发者可以直接访问和操作函数,支持将函数作为一个对象进行加载、传递和管理,这对于开发一些高级功能,提高程序扩展性非常方便。

SmartETL函数式组件是指将任意编写的数据处理函数作为ETL流程组件,加入到流程处理中。唯一的限制是:除了作为Loader组件的函数外(框架无法提供输入),函数应该以流程数据作为输入参数,并将需要向后续流程传递的数据作为输出参数。以下表格说明了函数的参数与节点类型作用的对应关系:

节点类型是否支持输入是否要求有输出
Loader节点否(可通过配置提供)
Processor节点是(流程数据作为第一个参数)均可

为了使用函数对象,框架设计了函数式Loader组件Function如下:

class Function(DataProvider):
    """函数调用包装器 提供调用函数的结果"""
    def __init__(self, function, *args, **kwargs):
        """
        :param function 函数对象或函数对象的完整限定名(如wikidata_filter.util.files.get_lines)
        """
        assert function is not None, "function is None!"
        if isinstance(function, str):
            from wikidata_filter.util.mod_util import load_cls
            function = load_cls(function)[0]
        self.function = function
        self.args = args
        self.kwargs = kwargs

    def iter(self):
        """DataProvider的主要API,对提供函数进行调用"""
        # 注意,使用了组件构造参数
        res = self.function(*self.args, **self.kwargs)
        if isinstance(res, GeneratorType):
            for item in res:
                yield item
        else:
            yield res

类似的,框架实现了Function(JsonIterator)。常用的Map组件也支持提供函数对象或函数对象完整限定名。

基于函数式组件对本文开头的示例进行改写,代码如下:

def concat_paper_content(paper: dict):
    paper = paper or {}
    abstract = paper.get('abstract')
    content = f"{abstract}\n"
    sections = paper.get('sections')
    for section in sections:
        content += f"{section['content']}\n"
    return content

在yaml流程中进行引用,如下所示:

nodes:
  concat: Map('gestata.arxiv.concat_paper_content')

或者:

nodes:
  concat: Function('wikidata_filter.gestata.arxiv.concat_paper_content')

流程说明:通过yaml流程文件,将concat_content函数与Map进行绑定(假设该函数定义在wikidata_filter.gestata.arxiv模块中),实现对基于paper的处理,并将函数调用返回值作为content字段值。

注意,为了支持Function使用自定义组件(可能在任意sys.path可访问模块),需要提供完整的函数对象限定名,本示例中包括顶层模块wikidata_filter

那么,MapFunction有什么区别呢?主要区别是Map主要是为了支持wikidata_filter.gestatawikidata_filter.util模块中定义的函数,且支持指定要处理的字段(通过key参数)和目标字段(通过target_key参数)。

从示例中可以看出,使用函数式组件至少有几点好处:

  • 代码更简洁:只需要实现一个提供核心处理逻辑的函数即可。
  • 配置更加灵活:通过流程指定输入字段和输出字段,可以灵活适配不同业务数据。
  • 复用性更好:可以通过代码或yaml配置进行复用。

在此前arXiv论文数据处理应用流程中,大量采用了函数式组件。具体可查看https://github.com/ictchenbo/SmartETL/blob/main/wikidata_filter/gestata/arxiv.py了解详情。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2379285.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

精准掌控张力动态,重构卷对卷工艺设计

一、MapleSim Web Handling Library仿真和虚拟调试解决方案 在柔性材料加工领域,卷对卷(Roll-to-Roll)工艺的效率与质量直接决定了产品竞争力。如何在高动态生产场景中实现张力稳定、减少断裂风险、优化加工速度,是行业长期面临的…

永磁同步电机公式总结【一】——反电动势、磁链、转矩公式;三项、两项电压方程;坐标表换方程

一、PMSM 电机参数介绍 1.1 转子极数 转子极数 (Rotor Poles) :三相交流电机每组线圈都会产生 N、S 磁极,每个电机每相含有的永磁体磁极个数就是极数。由于磁极是成对出现的,所以电机有 2、4、6、8……极 (偶数)。 未知参数的电机&#xff…

STL - stack 和 queue 及容器适配器模式的介绍

文章目录 1. stack 的介绍和使用1.1 stack 的介绍1.2 stack 的接口及使用1.3 stack 的模拟实现 2. queue 的介绍和使用2.1 queue 的介绍2.2 queue 的接口及使用2.3 queue 的模拟实现 3. priority_queue的介绍和使用3.1 priority_queue 的介绍3.2 priority_queue 的接口及使用3.…

windows 安装gdal实现png转tif,以及栅格拼接

windows 安装gdal实现png转tif,以及栅格拼接 一、安装gdal 网上有很多安装gdal的方法,此处通过osgeo4w安装gdal 1.下载osgeo4w 下载地址 https://trac.osgeo.org/osgeo4w/ 2、安装osgeo4w exe文件安装,前面部分很简单,就不再…

Socket.IO是什么?适用哪些场景?

Socket.IO 详细介绍及适用场景 一、Socket.IO 是什么? Socket.IO 是一个基于事件驱动的 实时通信库,支持双向、低延迟的客户端-服务器交互。它底层结合了 WebSocket 和 HTTP 长轮询 等技术,能够在不同网络环境下自动选择最优传输方式&#x…

深度学习入门:卷积神经网络

目录 1、整体结构2、卷积层2.1 全连接层存在的问题2.2 卷积运算2.3 填充2.4 步幅2.5 3维数据的卷积运算2.6 结合方块思考2.7 批处理 3、池化层4、卷积层和池化层的实现4.1 4维数组4.2 基于im2col的展开4.3 卷积层的实现4.4 池化层的实现 5、CNN的实现6、CNN的可视化6.1 第一层权…

【Odoo】Pycharm导入运行Odoo15

【Odoo】Pycharm导入运行Odoo15 前置准备1. Odoo-15项目下载解压2. PsrtgreSQL数据库 项目导入运行1. 项目导入2. 设置项目内虚拟环境3. 下载项目中依赖4. 修改配置文件odoo.conf 运行Pycharm快捷运行 前置准备 1. Odoo-15项目下载解压 将下载好的项目解压到开发目录下 2. …

pytest框架 - 第二集 allure报告

一、断言assert 二、Pytest 结合 allure-pytest 插件生成美观的 Allure 报告 (1) 安装 allure 环境 安装 allure-pytest 插件:pip install allure-pytest在 github 下载 allure 报告文件 地址:Releases allure-framework/allure2 GitHub下载&#x…

pycharm连接github(详细步骤)

【前提:菜鸟学习的记录过程,如果有不足之处,还请各位大佬大神们指教(感谢)】 1.先安装git 没有安装git的小伙伴,看上一篇安装git的文章。 安装git,2.49.0版本-CSDN博客 打开cmd(…

oracle linux 95 升级openssh 10 和openssl 3.5 过程记录

1. 安装操作系统,注意如果可以选择,选择安装开发工具,主要是后续需要编译安装,需要gcc 编译工具。 2. 安装操作系统后,检查zlib 、zlib-dev是否安装,如果没有,可以使用安装镜像做本地源安装&a…

Text models —— BERT,RoBERTa, BERTweet,LLama

BERT 什么是BERT? BERT,全称Bidirectional Encoder Representations from Transformers,BERT是基于Transformer的Encoder(编码器)结构得来的,因此核心与Transformer一致,都是注意力机制。这种…

【AGI】大模型微调数据集准备

【AGI】大模型微调数据集准备 (1)模型内置特殊字符及提示词模板(2)带有系统提示和Function calling微调数据集格式(3)带有思考过程的微调数据集结构(4)Qwen3混合推理模型构造微调数据…

新能源汽车制动系统建模全解析——从理论到工程应用

《纯电动轻卡制动系统建模全解析:车速-阻力拟合、刹车力模型与旋转质量转换系数优化》 摘要 本文以纯电动轻卡为研究对象,系统解析制动系统建模核心参数优化方法,涵盖: 车速-阻力曲线拟合(MATLAB实现与模型验证&…

【Bluedroid】蓝牙HID DEVICE 报告发送与电源管理源码解析

本文基于Android蓝牙协议栈代码,深度解析HID设备(如键盘、鼠标)从应用层发送输入报告到主机设备的完整流程,涵盖数据封装、通道选择、L2CAP传输、电源管理四大核心模块。通过函数调用链(send_report → BTA_HdSendRepo…

第9章 组件及事件处理

9.1 Java Swing概述 图像用户界面(GUI) java.awt包,即Java抽象窗口工具包,Button(按钮)、TextField(文本框)、List(列表) javax.swing包 容器类&#xff08…

用golang实现二叉搜索树(BST)

目录 一、概念、性质二、二叉搜索树的实现1. 结构2. 查找3. 插入4. 删除5. 中序遍历 中序前驱/后继结点 一、概念、性质 二叉搜索树(Binary Search Tree),简写BST,又称为二叉查找树 它满足: 空树是一颗二叉搜索树对…

服务器防文件上传手写waf

一、waf的目录结构,根据自己目录情况进行修改 二、创建文件夹以及文件 sudo mkdir -p /www/server/waf-monitor sudo mkdir -p /www/server/waf-monitor/quarantine #创建文件夹 chmod 755 /www/server/waf-monitor #赋权cd /www/server/waf-monitor/touch waf-m…

计算机的基本组成与性能

1. 冯诺依曼体系结构:计算机组成的金字塔 1.1. 计算机的基本硬件组成 1.CPU - 中央处理器(Central Processing Unit)。 2.内存(Memory)。 3.主板(Motherboard)。主板的芯片组(Ch…

linux下编写shell脚本一键编译源码

0 前言 进行linux应用层编程时,经常会使用重复的命令对源码进行编译,然后把编译生成的可执行文件拷贝到工作目录,操作非常繁琐且容易出错。本文编写一个简单的shell脚本一键编译源码。 1 linux下编写shell脚本一键编译源码 shell脚本如下&…

【深度学习】#12 计算机视觉

主要参考学习资料: 《动手学深度学习》阿斯顿张 等 著 【动手学深度学习 PyTorch版】哔哩哔哩跟李沐学AI 目录 目标检测锚框交并比(IoU)锚框标注真实边界框分配偏移量计算损失函数 非极大值抑制预测 多尺度目标检测单发多框检测(S…