Flask + Celery 应用

news2025/6/6 7:10:42

目录

    • Flask + Celery 应用
      • 项目结构
      • 1. 创建app.py
      • 2. 创建tasks.py
      • 3. 创建celery_worker.py
      • 4. 创建templates目录和index.html
      • 运行应用
      • 测试文件

Flask + Celery 应用

对于Flask与Celery结合的例子,需要创建几个文件。首先安装必要的依赖:

pip install flask celery redis requests

项目结构

让我们创建以下文件结构:

flask_celery/
├── app.py          # Flask应用
├── celery_worker.py # Celery worker
├── tasks.py        # Celery任务定义
└── templates/      # 模板目录
    └── index.html  # 主页模板

1. 创建app.py

from flask import Flask, request, render_template, jsonify
from celery_worker import celery
import tasks
import time

app = Flask(__name__)

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/process', methods=['POST'])
def process():
    # 获取表单数据
    data = request.form.get('data', '')
    
    # 启动异步任务
    task = tasks.process_data.delay(data)
    
    return jsonify({
        'task_id': task.id,
        'status': '任务已提交',
        'message': f'正在处理数据: {data}'
    })

@app.route('/status/<task_id>')
def task_status(task_id):
    # 获取任务状态
    task = tasks.process_data.AsyncResult(task_id)
    
    if task.state == 'PENDING':
        response = {
            'state': task.state,
            'status': '任务等待中...'
        }
    elif task.state == 'FAILURE':
        response = {
            'state': task.state,
            'status': '任务失败',
            'error': str(task.info)
        }
    else:
        response = {
            'state': task.state,
            'status': '任务完成' if task.state == 'SUCCESS' else '任务处理中',
            'result': task.result if task.state == 'SUCCESS' else None
        }
    
    return jsonify(response)

if __name__ == '__main__':
    app.run(debug=True)

2. 创建tasks.py

from celery_worker import celery
import time

@celery.task()
def process_data(data):
    # 模拟耗时操作
    time.sleep(5)
    
    # 处理数据(这里只是简单地转换为大写)
    result = data.upper()
    
    return {
        'original_data': data,
        'processed_data': result,
        'processing_time': '5秒'
    }

3. 创建celery_worker.py

from celery import Celery

# 创建Celery实例
celery = Celery(
    'tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0'
)

# 配置Celery
celery.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='Asia/Shanghai',
    enable_utc=True,
)

4. 创建templates目录和index.html

首先创建templates目录:

mkdir templates

然后创建index.html文件:

<!DOCTYPE html>
<html>
<head>
    <title>Flask + Celery 示例</title>
    <style>
        body { font-family: Arial, sans-serif; margin: 40px; line-height: 1.6; }
        h1 { color: #4285f4; }
        .container { max-width: 800px; margin: 0 auto; }
        .form-group { margin-bottom: 15px; }
        input[type="text"] { padding: 8px; width: 300px; }
        button { padding: 8px 15px; background-color: #4285f4; color: white; border: none; cursor: pointer; }
        button:hover { background-color: #3b78e7; }
        #result { margin-top: 20px; padding: 15px; background-color: #f5f5f5; border-radius: 5px; display: none; }
        #status { margin-top: 10px; font-style: italic; }
    </style>
</head>
<body>
    <div class="container">
        <h1>Flask + Celery 异步任务示例</h1>
        
        <div class="form-group">
            <label for="data">输入要处理的数据:</label><br>
            <input type="text" id="data" name="data" placeholder="输入一些文本...">
            <button onclick="submitTask()">提交任务</button>
        </div>
        
        <div id="status"></div>
        
        <div id="result">
            <h3>任务结果:</h3>
            <pre id="result-data"></pre>
        </div>
    </div>

    <script>
        function submitTask() {
            const data = document.getElementById('data').value;
            if (!data) {
                alert('请输入数据!');
                return;
            }
            
            const statusDiv = document.getElementById('status');
            statusDiv.textContent = '提交任务中...';
            
            // 提交任务
            fetch('/process', {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/x-www-form-urlencoded',
                },
                body: `data=${encodeURIComponent(data)}`
            })
            .then(response => response.json())
            .then(data => {
                statusDiv.textContent = data.status;
                
                // 开始轮询任务状态
                pollTaskStatus(data.task_id);
            })
            .catch(error => {
                statusDiv.textContent = `错误: ${error}`;
            });
        }
        
        function pollTaskStatus(taskId) {
            const statusDiv = document.getElementById('status');
            const resultDiv = document.getElementById('result');
            const resultDataPre = document.getElementById('result-data');
            
            // 定期检查任务状态
            const interval = setInterval(() => {
                fetch(`/status/${taskId}`)
                .then(response => response.json())
                .then(data => {
                    statusDiv.textContent = `状态: ${data.status}`;
                    
                    if (data.state === 'SUCCESS') {
                        clearInterval(interval);
                        resultDiv.style.display = 'block';
                        resultDataPre.textContent = JSON.stringify(data.result, null, 2);
                    } else if (data.state === 'FAILURE') {
                        clearInterval(interval);
                        statusDiv.textContent = `错误: ${data.error}`;
                    }
                })
                .catch(error => {
                    clearInterval(interval);
                    statusDiv.textContent = `轮询错误: ${error}`;
                });
            }, 1000);
        }
    </script>
</body>
</html>

运行应用

  1. 首先,确保Redis服务器正在运行(Celery需要它作为消息代理)可以用docker启动:
docker run --name redis-server -p 6379:6379 -d redis

在这里插入图片描述

  1. 启动Celery worker:
celery -A tasks worker --loglevel=info

在这里插入图片描述

  1. 在另一个终端中启动Flask应用:
python app.py

在这里插入图片描述

测试文件

创建一个新文件test_app.py

import requests
import time
import json

# 应用服务器地址
BASE_URL = 'http://localhost:5000'

def test_process_endpoint():
    """测试/process端点"""
    print("测试1: 提交任务处理")
    
    # 准备测试数据
    test_data = "hello world"
    
    # 发送POST请求到/process端点
    response = requests.post(
        f"{BASE_URL}/process",
        data={"data": test_data}
    )
    
    # 检查响应状态码
    if response.status_code == 200:
        print("✓ 状态码正确: 200")
    else:
        print(f"✗ 状态码错误: {response.status_code}")
    
    # 解析响应JSON
    result = response.json()
    
    # 检查响应内容
    if 'task_id' in result:
        print(f"✓ 成功获取任务ID: {result['task_id']}")
        return result['task_id']
    else:
        print("✗ 未能获取任务ID")
        return None

def test_status_endpoint(task_id):
    """测试/status/<task_id>端点"""
    print("\n测试2: 检查任务状态")
    
    if not task_id:
        print("✗ 无法测试状态端点: 缺少任务ID")
        return
    
    # 轮询任务状态,最多等待10秒
    max_attempts = 10
    for attempt in range(1, max_attempts + 1):
        print(f"\n轮询 {attempt}/{max_attempts}...")
        
        # 发送GET请求到/status/<task_id>端点
        response = requests.get(f"{BASE_URL}/status/{task_id}")
        
        # 检查响应状态码
        if response.status_code == 200:
            print("✓ 状态码正确: 200")
        else:
            print(f"✗ 状态码错误: {response.status_code}")
            continue
        
        # 解析响应JSON
        result = response.json()
        print(f"当前状态: {result.get('state', '未知')}")
        
        # 如果任务完成,显示结果并退出循环
        if result.get('state') == 'SUCCESS':
            print("\n✓ 任务成功完成!")
            print("结果:")
            print(json.dumps(result.get('result', {}), indent=2, ensure_ascii=False))
            return True
        
        # 如果任务失败,显示错误并退出循环
        if result.get('state') == 'FAILURE':
            print(f"\n✗ 任务失败: {result.get('error', '未知错误')}")
            return False
        
        # 等待1秒后再次检查
        time.sleep(1)
    
    print("\n✗ 超时: 任务未在预期时间内完成")
    return False

def test_index_endpoint():
    """测试首页端点"""
    print("\n测试3: 访问首页")
    
    # 发送GET请求到/端点
    response = requests.get(BASE_URL)
    
    # 检查响应状态码
    if response.status_code == 200:
        print("✓ 状态码正确: 200")
        print("✓ 成功访问首页")
    else:
        print(f"✗ 状态码错误: {response.status_code}")

def run_all_tests():
    """运行所有测试"""
    print("开始测试Flask+Celery应用...\n")
    
    # 测试1: 提交任务
    task_id = test_process_endpoint()
    
    # 测试2: 检查任务状态
    if task_id:
        test_status_endpoint(task_id)
    
    # 测试3: 访问首页
    test_index_endpoint()
    
    print("\n测试完成!")

if __name__ == "__main__":
    run_all_tests()

然后,在另一个终端中运行测试:

python test_app.py

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2398243.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

奥威BI+AI数据分析:企业数智化转型的加速器

在当今数据驱动的时代&#xff0c;企业对于数据分析的需求日益增长。奥威BIAI数据分析的组合&#xff0c;正成为众多企业数智化转型的加速器。 奥威BI以其强大的数据处理和可视化能力著称。它能够轻松接入多种数据源&#xff0c;实现数据的快速整合与清洗。通过内置的ETL工具&…

python打卡day43

复习日 作业&#xff1a; kaggle找到一个图像数据集&#xff0c;用cnn网络进行训练并且用grad-cam做可视化 进阶&#xff1a;并拆分成多个文件 找了个街头食物图像分类的数据集Popular Street Foods&#xff08;其实写代码的时候就开始后悔了&#xff09;&#xff0c;原因在于&…

Linux --进程优先级

概念 什么是进程优先级&#xff0c;为什么需要进程优先级&#xff0c;怎么做到进程优先级这是本文需要解释清楚的。 优先级的本质其实就是排队&#xff0c;为了去争夺有限的资源&#xff0c;比如cpu的调度。cpu资源分配的先后性就是指进程的优先级。优先级高的进程有优先执行的…

安装和配置 Nginx 和 Mysql —— 一步一步配置 Ubuntu Server 的 NodeJS 服务器详细实录6

前言 昨天更新了四篇博客&#xff0c;我们顺利的 安装了 ubuntu server 服务器&#xff0c;并且配置好了 ssh 免密登录服务器&#xff0c;安装好了 服务器常用软件安装, 配置好了 zsh 和 vim 以及 通过 NVM 安装好Nodejs&#xff0c;还有PNPM包管理工具 。 作为服务器的运行…

图解gpt之注意力机制原理与应用

大家有没有注意到&#xff0c;当序列变长时&#xff0c;比如翻译一篇长文章&#xff0c;或者处理一个长句子&#xff0c;RNN这种编码器就有点力不从心了。它把整个序列信息压缩到一个固定大小的向量里&#xff0c;信息丢失严重&#xff0c;而且很难记住前面的细节&#xff0c;特…

【Oracle】视图

个人主页&#xff1a;Guiat 归属专栏&#xff1a;Oracle 文章目录 1. 视图基础概述1.1 视图的概念与特点1.2 视图的工作原理1.3 视图的分类 2. 简单视图2.1 创建简单视图2.1.1 基本简单视图2.1.2 带计算列的简单视图 2.2 简单视图的DML操作2.2.1 通过视图进行INSERT操作2.2.2 通…

更强劲,更高效:智源研究院开源轻量级超长视频理解模型Video-XL-2

长视频理解是多模态大模型关键能力之一。尽管OpenAI GPT-4o、Google Gemini等私有模型已在该领域取得显著进展&#xff0c;当前的开源模型在效果、计算开销和运行效率等方面仍存在明显短板。近日&#xff0c;智源研究院联合上海交通大学等机构&#xff0c;正式发布新一代超长视…

2025.6.3学习日记 Nginx 基本概念 配置 指令 文件

1.初始nginx Nginx&#xff08;发音为 “engine x”&#xff09;是一款高性能的开源 Web 服务器软件&#xff0c;同时也具备反向代理、负载均衡、邮件代理等功能。它由俄罗斯工程师 Igor Sysoev 开发&#xff0c;最初用于解决高并发场景下的性能问题&#xff0c;因其轻量级、高…

【连接器专题】案例:产品测试顺序表解读与应用

在查看SD卡座连接器的规格书,一些测试报告时,你可能会看到如下一张产品测试顺序表。为什么会出现一张测试顺序表呢? 测试顺序表的使用其实定义测试环节的验证的“路线图”和“游戏规则”,本文就以我人个经验带领大家一起看懂这张表并理解其设计逻辑。 测试顺序表结构 测试…

星动纪元的机器人大模型 VPP,泛化能力效果如何?与 VLA 技术的区别是什么?

点击上方关注 “终端研发部” 设为“星标”&#xff0c;和你一起掌握更多数据库知识 VPP 利用了大量互联网视频数据进行训练&#xff0c;直接学习人类动作&#xff0c;减轻了对于高质量机器人真机数据的依赖&#xff0c;且可在不同人形机器人本体之间自如切换&#xff0c;这有望…

4000万日订单背后,饿了么再掀即时零售的“效率革命”

当即时零售转向价值深耕&#xff0c;赢面就是综合实力的强弱。 文&#xff5c;郭梦仪 编&#xff5c;王一粟 在硝烟弥漫的外卖行业“三国杀”中&#xff0c;饿了么与淘宝闪购的日订单量竟然突破了4000万单。 而距淘宝闪购正式上线&#xff0c;还不到一个月。 在大额福利优惠…

入门AJAX——XMLHttpRequest(Get)

一、什么是 AJAX AJAX Asynchronous JavaScript And XML&#xff08;异步的 JavaScript 和 XML&#xff09;。 1、XML与异步JS XML: 是一种比较老的前后端数据传输格式&#xff08;已经几乎被 JSON 代替&#xff09;。它的格式与HTML类似&#xff0c;通过严格的闭合自定义标…

5分钟申请edu邮箱【方案本周有效】

这篇文章主要展示的是成果。如果你是第1次看见我的内容&#xff0c;具体的步骤请翻看往期的两篇作品。先看更正补全&#xff0c;再看下一个。 建议你边看边操作。 【更正补全】edu教育申请通过方案 本周 edu教育邮箱注册可行方案 #edu邮箱 伟大无需多言 我已经验证了四个了…

闲谈PMIC和SBC

今天不卷&#xff0c;简单写点。 在ECU设计里&#xff0c;供电芯片选型是逃不开的话题&#xff0c;所以聊聊PMIC或者SBC的各自特点&#xff0c;小小总结下。 PMIC&#xff0c;全称Power Management Intergrated Circuits&#xff0c;听名字就很专业&#xff1a;电源管理&…

Java垃圾回收机制深度解析:从理论到实践的全方位指南

Java垃圾回收(GC)是Java虚拟机(JVM)的核心功能&#xff0c;它自动管理内存分配与回收&#xff0c;避免了C/C中常见的内存泄漏问题。本文将深入剖析Java垃圾回收的工作原理、算法实现、收集器类型及调优策略&#xff0c;助你全面掌握JVM内存管理的精髓。 一、垃圾回收基础概念 …

论文阅读:CLIP:Learning Transferable Visual Models From Natural Language Supervision

从自然语言监督中学习可迁移的视觉模型 虽然有点data/gpu is all you need的味道&#xff0c;但是整体实验和谈论丰富度上还是很多的&#xff0c;也是一篇让我多次想放弃的文章&#xff0c;因为真的是非常长的原文和超级多的实验讨论&#xff0c;隔着屏幕感受到了实验的工作量之…

在图像分析算法部署中应对流行趋势的变化|文献速递-深度学习医疗AI最新文献

Title 题目 Navigating prevalence shifts in image analysis algorithm deployment 在图像分析算法部署中应对流行趋势的变化 01 文献速递介绍 机器学习&#xff08;ML&#xff09;已开始革新成像研究与实践的诸多领域。然而&#xff0c;医学图像分析领域存在显著的转化鸿…

CAMEL-AI开源自动化任务执行助手OWL一键整合包下载

OWL 是由 CAMEL-AI 团队开发的开源多智能体协作框架&#xff0c;旨在通过动态智能体交互实现复杂任务的自动化处理&#xff0c;在 GAIA 基准测试中以 69.09 分位列开源框架榜首&#xff0c;被誉为“Manus 的开源平替”。我基于当前最新版本制作了免安装一键启动整合包。 CAMEL-…

Linux系统-基本指令(5)

文章目录 mv 指令cat 指令&#xff08;查看小文件&#xff09;知识点&#xff08;简单阐述日志&#xff09;more 和 less 指令&#xff08;查看大文件&#xff09;head 和 tail 指令&#xff08;跟查看文件有关&#xff09;知识点&#xff08;管道&#xff09;时间相关的指令&a…

C# winform教程(二)

一、基础控件 常用的基础控件主要有按钮&#xff0c;文本&#xff0c;文本输入&#xff0c;组&#xff0c;进度条&#xff0c;等等。 基础控件 名称含义详细用法Button按钮Buttoncheckbox多选按钮Combobox下拉选择groupbox组控件label标签&#xff0c;显示文字panel控件集合&a…