别再手动点点点了!用Python脚本自动化调用Dify工作流API(附完整代码)
用Python脚本实现Dify工作流API的自动化调用与生产级实践在数据处理和AI应用开发中手动操作Web界面不仅效率低下也难以应对批量任务的需求。本文将介绍如何通过Python脚本将Dify工作流API封装为可复用的自动化工具并分享生产环境中常见的优化技巧。1. 自动化调用的核心设计1.1 基础API客户端封装一个健壮的API客户端应该具备以下特性可配置的连接参数统一的错误处理机制可扩展的请求方法import requests import json from typing import Optional, Dict, Any class DifyAutomationClient: def __init__(self, api_key: str, base_url: str https://api.dify.ai): self.session requests.Session() self.base_url base_url.rstrip(/) self.session.headers.update({ Authorization: fBearer {api_key}, Content-Type: application/json }) def _make_request(self, method: str, endpoint: str, **kwargs) - Optional[Dict[str, Any]]: url f{self.base_url}{endpoint} try: response self.session.request(method, url, **kwargs) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: print(fAPI请求失败: {str(e)}) return None1.2 文件上传与工作流执行的分离设计将文件上传和工作流执行解耦可以提高代码的复用性def upload_file(self, file_path: str) - Optional[str]: 上传文件到Dify并返回文件ID endpoint /v1/files/upload with open(file_path, rb) as f: files {file: f} response self._make_request(POST, endpoint, filesfiles) return response.get(id) if response else None def execute_workflow(self, workflow_id: str, inputs: Dict[str, Any]) - Optional[Dict[str, Any]]: 执行指定工作流 endpoint f/v1/workflows/{workflow_id}/run payload { inputs: inputs, response_mode: blocking } return self._make_request(POST, endpoint, jsonpayload)2. 生产环境的关键增强功能2.1 自动重试机制网络不稳定时自动重试可以显著提高成功率from time import sleep from random import uniform def execute_with_retry(self, max_retries: int 3, **kwargs): 带指数退避的自动重试机制 for attempt in range(max_retries): response self.execute_workflow(**kwargs) if response is not None: return response wait_time min(5 * (2 ** attempt), 30) # 指数退避上限30秒 sleep(wait_time * uniform(0.8, 1.2)) # 添加随机抖动 raise Exception(f操作失败已达最大重试次数{max_retries})2.2 完善的日志记录结构化日志对于问题排查至关重要import logging from datetime import datetime class APILogger: def __init__(self, name: str): self.logger logging.getLogger(name) self.logger.setLevel(logging.INFO) handler logging.FileHandler(dify_automation.log) formatter logging.Formatter(%(asctime)s - %(levelname)s - %(message)s) handler.setFormatter(formatter) self.logger.addHandler(handler) def log_operation(self, operation: str, status: str, metadata: dict None): log_entry { operation: operation, status: status, timestamp: datetime.utcnow().isoformat(), metadata: metadata or {} } self.logger.info(json.dumps(log_entry))3. 批量任务处理方案3.1 基于队列的任务调度使用队列可以更好地管理批量任务from queue import Queue from threading import Thread class TaskProcessor: def __init__(self, client: DifyAutomationClient, worker_count: int 4): self.client client self.task_queue Queue() self.workers [ Thread(targetself._process_tasks, daemonTrue) for _ in range(worker_count) ] def add_task(self, workflow_id: str, inputs: dict): self.task_queue.put((workflow_id, inputs)) def start(self): for worker in self.workers: worker.start() def _process_tasks(self): while True: workflow_id, inputs self.task_queue.get() try: result self.client.execute_with_retry( workflow_idworkflow_id, inputsinputs ) # 处理结果... finally: self.task_queue.task_done()3.2 任务状态监控实时监控有助于了解系统运行状况class TaskMonitor: def __init__(self): self.success_count 0 self.failure_count 0 self.last_error None def update(self, success: bool, error: str None): if success: self.success_count 1 else: self.failure_count 1 self.last_error error def get_stats(self) - dict: return { success: self.success_count, failure: self.failure_count, last_error: self.last_error, completion_rate: self.success_count / max(1, self.success_count self.failure_count) }4. 安全与性能优化4.1 敏感信息管理永远不要在代码中硬编码敏感信息import os from dotenv import load_dotenv load_dotenv() # 从.env文件加载环境变量 client DifyAutomationClient( api_keyos.getenv(DIFY_API_KEY), base_urlos.getenv(DIFY_BASE_URL) )4.2 性能调优技巧通过连接池和超时优化提升性能from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry def configure_session(self): 配置优化的会话参数 retry_strategy Retry( total3, backoff_factor1, status_forcelist[408, 429, 500, 502, 503, 504] ) adapter HTTPAdapter( max_retriesretry_strategy, pool_connections10, pool_maxsize100 ) self.session.mount(https://, adapter) self.session.mount(http://, adapter)5. 实际应用案例5.1 日报自动分析系统def process_daily_reports(client: DifyAutomationClient, report_dir: str): 处理目录下的所有日报文件 logger APILogger(daily_report) monitor TaskMonitor() for filename in os.listdir(report_dir): if filename.endswith(.txt): filepath os.path.join(report_dir, filename) try: file_id client.upload_file(filepath) if file_id: result client.execute_workflow( workflow_iddaily_analysis, inputs{file_id: file_id} ) monitor.update(True) logger.log_operation( process_report, success, {filename: filename} ) except Exception as e: monitor.update(False, str(e)) logger.log_operation( process_report, failed, {error: str(e)} ) print(f处理完成成功率: {monitor.get_stats()[completion_rate]:.2%})5.2 定时任务集成使用APScheduler实现定时执行from apscheduler.schedulers.blocking import BlockingScheduler def setup_scheduler(client: DifyAutomationClient): scheduler BlockingScheduler() scheduler.scheduled_job(cron, hour2, minute30) def nightly_processing(): process_daily_reports(client, /data/reports) scheduler.start()
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2461799.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!