Go语言——goflow工作流使用

news2025/5/11 9:35:39

一、引入依赖

这个很坑,他不允许连接带密码的redis,只能使用不带密码的redis,要带密码的话得自己改一下源代码,无语

go get github.com/s8sg/goflow

二、画出我们的工作流程

在这里插入图片描述

三、编写代码

package main

import (
	"encoding/json"
	"fmt"
	flow "github.com/s8sg/goflow/flow/v1"
	goflow "github.com/s8sg/goflow/v1"
	"log"
	"math/rand"
	"strconv"
)

// Input 输入一个数字
func Input(data []byte, option map[string][]string) ([]byte, error) {
	var input map[string]int
	// 获取输入的数
	if err := json.Unmarshal(data, &input); err != nil {
		return nil, err
	}
	outputInt := input["input"]
	// 将数据交给工作流处理
	return []byte(strconv.Itoa(outputInt)), nil
}

// AddOne 加上10以内的一个随机整数
func AddOne(data []byte, option map[string][]string) ([]byte, error) {
	// 获取上一个工作流的数据
	num, _ := strconv.Atoi(string(data))
	outputInt := num + rand.Intn(10) + 1
	fmt.Println("AddOne = ", outputInt)
	// 交给下一个工作流处理
	return []byte(strconv.Itoa(outputInt)), nil
}

// AddTwo 加上10以内的一个随机整数
func AddTwo(data []byte, option map[string][]string) ([]byte, error) {
	num, _ := strconv.Atoi(string(data))
	outputInt := num + rand.Intn(10) + 1
	fmt.Println("AddTwo = ", outputInt)
	return []byte(strconv.Itoa(outputInt)), nil
}

// Aggregator 聚合节点
func Aggregator(data []byte, option map[string][]string) ([]byte, error) {
	fmt.Println("Aggregator = ", string(data))
	return data, nil
}

// Expand10 扩大10倍
func Expand10(data []byte, option map[string][]string) ([]byte, error) {
	num, _ := strconv.Atoi(string(data))
	outputInt := num * 10
	fmt.Println("Expand10 = ", outputInt)
	return []byte(strconv.Itoa(outputInt)), nil
}

// Expand100 扩大100倍
func Expand100(data []byte, option map[string][]string) ([]byte, error) {
	num, _ := strconv.Atoi(string(data))
	outputInt := num * 100
	fmt.Println("Expand100 = ", outputInt)
	return []byte(strconv.Itoa(outputInt)), nil
}

// Output 输出节点
func Output(data []byte, option map[string][]string) ([]byte, error) {
	fmt.Println("Output = ", string(data))
	return data, nil
}

// 定义我们自己的一个流程
func MyFlow(workflow *flow.Workflow, context *flow.Context) error {
	// 创建DAG
	dag := workflow.Dag()
	// 创建节点
	dag.Node("input", Input)
	dag.Node("add-one", AddOne)
	dag.Node("add-two", AddTwo)
	// 这个聚合节点,就需要拿到add-one和add-two的结果
	dag.Node("aggregator", Aggregator, flow.Aggregator(func(m map[string][]byte) ([]byte, error) {
		addOneResult, _ := strconv.Atoi(string(m["add-one"]))
		addTwoResult, _ := strconv.Atoi(string(m["add-two"]))
		num := addOneResult + addTwoResult
		fmt.Println("aggregator = ", num)
		return []byte(strconv.Itoa(num)), nil
	}))
	// 这个方式是获取到节点的数据进行判断,然后返回一个字符串数组
	f1 := func(bytes []byte) []string {
		num, _ := strconv.Atoi(string(bytes))
		fmt.Println("ConditionalBranch = ", num)
		if num > 10 {
			return []string{"moreThan"}
		}
		return []string{"lessThan"}
	}
	// 这个方法就是将分支的数据返回给output
	f2 := func(m map[string][]byte) ([]byte, error) {
		if v, ok := m["moreThan"]; ok {
			i, _ := strconv.Atoi(string(v))
			fmt.Println("f2 moreThan = ", i)
			return v, nil
		}
		if v, ok := m["lessThan"]; ok {
			i, _ := strconv.Atoi(string(v))
			fmt.Println("f2 lessThan = ", i)
			return v, nil
		}
		return nil, nil
	}
	// 创建一个条件分支节点
	branches := dag.ConditionalBranch("judge", []string{"moreThan", "lessThan"}, f1, flow.Aggregator(f2))
	branches["moreThan"].Node("expand-10", Expand10)
	branches["lessThan"].Node("expand-100", Expand100)
	dag.Node("output", Output)
	// 构建关系
	dag.Edge("input", "add-one")
	dag.Edge("input", "add-two")
	dag.Edge("add-one", "aggregator")
	dag.Edge("add-two", "aggregator")
	dag.Edge("aggregator", "judge")
	dag.Edge("judge", "output")
	return nil
}

func main() {
	fs := goflow.FlowService{
		Port:              10001,
		RedisURL:          "127.0.0.1:6379",
		RedisPwd:          "p@ssw0rd",
		WorkerConcurrency: 5,
		RetryCount:        0,
	}
	if err := fs.Register("myFlow", MyFlow); err != nil {
		log.Printf("goflow register err: %v\n", err)
		return
	}
	if err := fs.Start(); err != nil {
		panic(err)
	}
}

四、Postman测试

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2373016.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

yarn npm pnpm

1 下载方式 npm 之前串行下载 现在并行下载 yarn 并行下载 加入缓存复用 pnpm 硬连接 避免重复下载,先检查本地是否存在,存在的话直接连接过去

Block Styler——字符串控件

字符串控件的应用 参考官方帮助案例:(这个方式感觉更好,第二种方式也可以)E:\NX1980\UGOPEN\SampleNXOpenApplications\C\BlockStyler\ColoredBlock 普通格式: 读取: //方法一 string0->GetProperti…

LangGraph(三)——添加记忆

目录 1. 创建MemorySaver检查指针2. 构建并编译Graph3. 与聊天机器人互动4. 问一个后续问题5. 检查State参考 1. 创建MemorySaver检查指针 创建MemorySaver检查指针: from langgraph.checkpoint.memory import MemorySavermemory MemorySaver()这是位于内存中的检…

【无标题】I/O复用(epoll)三者区别▲

一、SOCKET-IO复用技术 定义:SOCKET - IO复用技术是一种高效处理多个套接字(socket)的手段,能让单个线程同时监听多个文件描述符(如套接字)上的I/O事件(像可读、可写、异常)&#x…

ClassLoader类加载机制的核心引擎

ClassLoader类加载机制的核心引擎 文章目录 ClassLoader类加载机制的核心引擎1. ClassLoader基础1.1 什么是ClassLoader?1.2 ClassLoader的层次结构1.3 类加载的过程 2. 源码解析与工作原理2.1 ClassLoader的核心方法2.2 双亲委派模型的工作原理2.3 打破双亲委派模型…

tryhackme——Enumerating Active Directory

文章目录 一、凭据注入1.1 RUNAS1.2 SYSVOL1.3 IP和主机名 二、通过Microsoft Management Console枚举AD三、通过命令行net命令枚举四、通过powershell枚举 一、凭据注入 1.1 RUNAS 当获得AD凭证<用户名>:<密码>但无法登录域内机器时&#xff0c;runas.exe可帮助…

【Linux学习笔记】系统文件IO之重定向原理分析

【Linux学习笔记】系统文件IO之重定向原理分析 &#x1f525;个人主页&#xff1a;大白的编程日记 &#x1f525;专栏&#xff1a;Linux学习笔记 文章目录 【Linux学习笔记】系统文件IO之重定向原理分析前言一. 系统文件I/01.1 一种传递标志位的方法1.2 hello.c写文件:1.3 he…

SpringBoot中使用MCP和通义千问来处理和分析数据-连接本地数据库并生成实体类

文章目录 前言一、正文1.1 项目结构1.2 项目环境1.3 完整代码1.3.1 spring-mcp-demo的pom文件1.3.2 generate-code-server的pom文件1.3.3 ChatClientConfig1.3.4 FileTemplateConfig1.3.5 ServiceProviderConfig1.3.6 GenerateCodeController1.3.7 Columns1.3.8 Tables1.3.9 Fi…

实现滑动选择器从离散型的数组中选择

1.使用原生的input 详细代码如下&#xff1a; <template><div class"slider-container"><!-- 滑动条 --><inputtype"range"v-model.number"sliderIndex":min"0":max"customValues.length - 1"step&qu…

基于Credit的流量控制

流量控制(Flow Control)&#xff0c;也叫流控&#xff0c;它是控制组件之间发送和接收信息的过程。在总线中&#xff0c;流控的基本单位称为flit。 在标准同步接口中(比如AXI协议接口)&#xff0c;握手信号如果直接采用寄存器打拍的方式容易导致信号在不同的方向上出现偏离。因…

【金仓数据库征文】金仓数据库KingbaseES: 技术优势与实践指南(包含安装)

目录 前言 引言 一 : 关于KingbaseES,他有那些优势呢? 核心特性 典型应用场景 政务信息化 金融核心系统&#xff1a; 能源通信行业&#xff1a; 企业级信息系统&#xff1a; 二: 下载安装KingbaseES 三:目录一览表: 四:常用SQL语句 创建表&#xff1a; 修改表结构…

金丝猴食品:智能中枢AI-COP构建全链路数智化运营体系

“金丝猴奶糖”&#xff0c;这个曾藏在无数人童年口袋里的甜蜜符号&#xff0c;如今正经历一场数智焕新。当传统糖果遇上数字浪潮&#xff0c;这家承载着几代人味蕾记忆的企业&#xff0c;选择以数智化协同运营平台为“新配方”&#xff0c;将童年味道酿成智慧管理的醇香——让…

java的输入输出模板(ACM模式)

文章目录 1、前置准备2、普通输入输出API①、输入API②、输出API 3、快速输入输出API①、BufferedReader②、BufferedWriter 案例题目描述代码 面试有时候要acm模式&#xff0c;刷惯leetcode可能会手生不会acm模式&#xff0c;该文直接通过几个题来熟悉java的输入输出模板&…

鸿蒙 所有API缩略图鉴

从HarmonyOS NEXT Developer Preview1&#xff08;API 11&#xff09;版本开始&#xff0c;HarmonyOS SDK以 Kit 维度提供丰富、完备的开放能力&#xff0c;涵盖应用框架、应用服务、系统、媒体、AI、图形在内的六大领域&#xff0c;共计30000个API

【Docker系列】使用格式化输出与排序技巧

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

基础语法(二)

Mysql基础语法&#xff08;二&#xff09; Mysql基础语法&#xff08;二&#xff09;主要介绍Mysql中稍微进阶一点的内容&#xff0c;会稍微有一些难度&#xff08;博主个人认为&#xff09;。学习完基础语法&#xff08;一&#xff09;和基础语法&#xff08;二&#xff09;之…

TOA的定位,建模与解算的步骤、公式推导

TOA(到达时间)定位的核心是通过测量信号从目标到多个基站的传播时间,将其转换为距离信息,并利用几何关系解算目标位置。本文给出具体的建模与解算步骤及公式推导 文章目录 通用模型建立非线性方程组构建线性化处理(最小二乘法)最大似然估计(ML)高斯-牛顿迭代法误差分析…

2025年PMP 学习七 -第5章 项目范围管理 (5.4,5.5,5.6 )

2025年PMP 学习七 -第5章 项目范围管理 5.4 创建 WBS 1.定义与作用 定义把项目可交付成果和项目工作分解成较小的&#xff0c;更易于管理的组件作用对所要交付的内容提供一个结构化的视图 2.输入&#xff0c;输出&#xff0c;工具与技术 3. 创建WBS的依据&#xff08;输入&…

CAD属性图框值与Excel联动(CAD块属性导出Excel、excel更新CAD块属性)——CAD c#二次开发

CAD插件实现块属性值与excel的互动&#xff0c;效果如下&#xff1a; 加载dll插件&#xff08;CAD 命令行输入netload &#xff0c;运行xx即可导出Excel&#xff0c;运行xx1即可根据excel更新dwg块属性值。&#xff09; 部分代码如下 // 4. 开启事务更新CAD数据using (Transact…

【HarmonyOS 5】鸿蒙中进度条的使用详解

【HarmonyOS 5】鸿蒙中进度条的使用详解 一、HarmonyOS中Progress进度条的类型 HarmonyOS的ArkUI框架为开发者提供了多种类型的进度条&#xff0c;每种类型都有其独特的样式&#xff0c;以满足不同的设计需求。以下是几种常见的进度条类型&#xff1a; 线性进度条&#xff08;…