Flask与Celery 项目应用(shared_task使用)

news2025/7/28 2:01:52

目录

    • 1. 项目概述
      • 主要功能
      • 技术栈
    • 2. 项目结构
    • 3. 环境设置
      • 创建虚拟环境并安装依赖
      • 主要依赖
    • 4. 应用配置
      • Flask应用初始化 (`__init__.py`)
      • Celery应用初始化 (`make_celery.py`)
    • 5. 定义Celery任务 (`tasks.py`)
      • 任务说明
    • 6. 创建API端点 (`views.py`)
      • API端点说明
    • 7. 前端界面 (`index.html`)
      • 前端JavaScript实现
    • 8. 运行应用
    • 用docker启动redis
      • 启动Celery Worker
      • 启动Flask开发服务器
    • 9. 关键概念解析
      • Celery任务装饰器
      • 任务ID
      • 任务状态查询

1. 项目概述

本教程将指导您创建一个Flask应用,该应用使用Celery处理后台任务。我们将构建一个简单的Web界面,允许用户提交三种不同类型的任务,并通过JavaScript轮询查看任务结果。

主要功能

  • 简单计算任务:将两个数字相加
  • 阻塞任务:模拟长时间运行的任务
  • 进度报告任务:显示任务执行进度

技术栈

  • Flask: Web框架
  • Celery: 分布式任务队列
  • Redis: 消息代理和结果后端
  • JavaScript: 前端交互和轮询

2. 项目结构

flask_celery_app/
├── .venv/                  # 虚拟环境
├── README.md              # 项目说明
├── make_celery.py         # Celery应用初始化
├── pyproject.toml         # 项目配置
├── requirements.txt       # 依赖列表
└── src/
    └── task_app/
        ├── __init__.py    # Flask应用初始化
        ├── tasks.py       # Celery任务定义
        ├── views.py       # Flask视图和API
        └── templates/
            └── index.html  # 前端界面

3. 环境设置

创建虚拟环境并安装依赖

uv venv
Using CPython 3.12.10
Creating virtual environment at: .venv
Activate with: source .venv/bin/activate
uv pip install -r requirements.txt 
uv pip install -e .

主要依赖

  • Flask
  • Celery
  • Redis

4. 应用配置

Flask应用初始化 (__init__.py)

from celery import Celery
from celery import Task
from flask import Flask
from flask import render_template


def create_app() -> Flask:
    app = Flask(__name__)
    app.config.from_mapping(
        CELERY=dict(
            broker_url="redis://localhost",
            result_backend="redis://localhost",
            task_ignore_result=True,
        ),
    )
    app.config.from_prefixed_env()
    celery_init_app(app)

    @app.route("/")
    def index() -> str:
        return render_template("index.html")

    from . import views

    app.register_blueprint(views.bp)
    return app


def celery_init_app(app: Flask) -> Celery:
    class FlaskTask(Task):
        def __call__(self, *args: object, **kwargs: object) -> object:
            with app.app_context():
                return self.run(*args, **kwargs)

    celery_app = Celery(app.name, task_cls=FlaskTask)
    celery_app.config_from_object(app.config["CELERY"])
    celery_app.set_default()
    app.extensions["celery"] = celery_app
    return celery_app

Celery应用初始化 (make_celery.py)

from task_app import create_app

flask_app = create_app()
celery_app = flask_app.extensions["celery"]

5. 定义Celery任务 (tasks.py)

import time

from celery import shared_task
from celery import Task


@shared_task(ignore_result=False)
def add(a: int, b: int) -> int:
    return a + b


@shared_task()
def block() -> None:
    time.sleep(5)


@shared_task(bind=True, ignore_result=False)
def process(self: Task, total: int) -> object:
    for i in range(total):
        self.update_state(state="PROGRESS", meta={"current": i + 1, "total": total})
        time.sleep(1)

    return {"current": total, "total": total}

任务说明

  1. add任务: 简单的加法计算,设置ignore_result=False以保存结果
  2. block任务: 模拟耗时操作,不返回结果
  3. process任务: 带进度报告的任务,使用update_state更新进度

6. 创建API端点 (views.py)

from celery.result import AsyncResult
from flask import Blueprint
from flask import request

from . import tasks

bp = Blueprint("tasks", __name__, url_prefix="/tasks")


@bp.get("/result/<id>")
def result(id: str) -> dict[str, object]:
    result = AsyncResult(id)
    ready = result.ready()
    return {
        "ready": ready,
        "successful": result.successful() if ready else None,
        "value": result.get() if ready else result.result,
    }


@bp.post("/add")
def add() -> dict[str, object]:
    a = request.form.get("a", type=int)
    b = request.form.get("b", type=int)
    result = tasks.add.delay(a, b)
    return {"result_id": result.id}


@bp.post("/block")
def block() -> dict[str, object]:
    result = tasks.block.delay()
    return {"result_id": result.id}


@bp.post("/process")
def process() -> dict[str, object]:
    result = tasks.process.delay(total=request.form.get("total", type=int))
    return {"result_id": result.id}

API端点说明

  1. /tasks/result/: 获取任务结果
  2. /tasks/add: 提交加法任务
  3. /tasks/block: 提交阻塞任务
  4. /tasks/process: 提交带进度的任务

7. 前端界面 (index.html)

<!doctype html>
<html>
<head>
  <meta charset=UTF-8>
  <title>Celery Example</title>
</head>
<body>
<h2>Celery Example</h2>
Execute background tasks with Celery. Submits tasks and shows results using JavaScript.

<hr>
<h4>Add</h4>
<p>Start a task to add two numbers, then poll for the result.
<form id=add method=post action="{{ url_for("tasks.add") }}">
  <label>A <input type=number name=a value=4></label><br>
  <label>B <input type=number name=b value=2></label><br>
  <input type=submit>
</form>
<p>Result: <span id=add-result></span></p>

<hr>
<h4>Block</h4>
<p>Start a task that takes 5 seconds. However, the response will return immediately.
<form id=block method=post action="{{ url_for("tasks.block") }}">
  <input type=submit>
</form>
<p id=block-result></p>

<hr>
<h4>Process</h4>
<p>Start a task that counts, waiting one second each time, showing progress.
<form id=process method=post action="{{ url_for("tasks.process") }}">
  <label>Total <input type=number name=total value="10"></label><br>
  <input type=submit>
</form>
<p id=process-result></p>

<script>
  const taskForm = (formName, doPoll, report) => {
    document.forms[formName].addEventListener("submit", (event) => {
      event.preventDefault()
      fetch(event.target.action, {
        method: "POST",
        body: new FormData(event.target)
      })
        .then(response => response.json())
        .then(data => {
          report(null)

          const poll = () => {
            fetch(`/tasks/result/${data["result_id"]}`)
              .then(response => response.json())
              .then(data => {
                report(data)

                if (!data["ready"]) {
                  setTimeout(poll, 500)
                } else if (!data["successful"]) {
                  console.error(formName, data)
                }
              })
          }

          if (doPoll) {
            poll()
          }
        })
    })
  }

  taskForm("add", true, data => {
    const el = document.getElementById("add-result")

    if (data === null) {
      el.innerText = "submitted"
    } else if (!data["ready"]) {
      el.innerText = "waiting"
    } else if (!data["successful"]) {
      el.innerText = "error, check console"
    } else {
      el.innerText = data["value"]
    }
  })

  taskForm("block", false, data => {
    document.getElementById("block-result").innerText = (
      "request finished, check celery log to see task finish in 5 seconds"
    )
  })

  taskForm("process", true, data => {
    const el = document.getElementById("process-result")

    if (data === null) {
      el.innerText = "submitted"
    } else if (!data["ready"]) {
      el.innerText = `${data["value"]["current"]} / ${data["value"]["total"]}`
    } else if (!data["successful"]) {
      el.innerText = "error, check console"
    } else {
      el.innerText = "✅ done"
    }
    console.log(data)
  })

</script>
</body>
</html>

前端JavaScript实现

前端使用JavaScript发送任务请求并轮询结果:

  1. 提交表单时阻止默认行为,使用fetch API发送POST请求
  2. 获取任务ID后,定期轮询/tasks/result/<id>端点
  3. 根据任务状态更新UI

8. 运行应用

用docker启动redis

docker run --name redis-server -p 6379:6379 -d redis

启动Celery Worker

# 在第一个终端窗口
celery -A make_celery worker --loglevel INFO

在这里插入图片描述

启动Flask开发服务器

# 在第二个终端窗口
flask -A task_app run --debug

在这里插入图片描述

访问 http://localhost:5000/ 使用应用。
在这里插入图片描述

9. 关键概念解析

Celery任务装饰器

  • @shared_task(ignore_result=False): 创建可共享的任务,并保存结果
  • bind=True: 将任务实例作为第一个参数传递给任务函数

任务ID

  • 每个任务都有一个自动生成的唯一ID
  • 通过result.id获取,用于后续查询任务状态

任务状态查询

  • AsyncResult(id): 通过ID获取任务结果对象
  • result.ready(): 检查任务是否完成
  • result.successful(): 检查任务是否成功完成
  • result.get(): 获取任务结果

项目链接:https://github.com/pallets/flask/tree/main/examples/celery
可用downgit单独下载项目中某个文件夹:https://minhaskamal.github.io/DownGit/#/home?url=https:%2F%2Fgithub.com%2Fpallets%2Fflask%2Ftree%2Fmain%2Fexamples%2Fcelery

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2405932.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

二叉树-226.翻转链表-力扣(LeetCode)

一、题目解析 翻转可以理解为树的左右子树交换&#xff0c;从根到叶子节点&#xff0c;但是这里交换的是链接的指针&#xff0c;而不是单纯的交换值&#xff0c;当出现nullptr时&#xff0c;也是可以交换链接的&#xff0c;交换值的话就不行了。 二、算法原理 依旧的递归&…

HarmonyOS Next 弹窗系列教程(3)

HarmonyOS Next 弹窗系列教程&#xff08;3&#xff09; 选择器弹窗 (PickerDialog) 介绍 选择器弹窗通常用于在用户进行某些操作&#xff08;如点击按钮&#xff09;时显示特定的信息或选项。让用户可以进行选择提供的固定的内容。 以下内容都属于选择器弹窗&#xff1a; …

【docker】Windows安装docker

环境及工具&#xff08;点击下载&#xff09; Docker Desktop Installer.exe &#xff08;windows 环境下运行docker的一款产品&#xff09; wsl_update_x64 &#xff08;Linux 内核包&#xff09; 前期准备 系统要求2&#xff1a; Windows 11&#xff1a;64 位系统&am…

无人机避障——感知部分(Ubuntu 20.04 复现Vins Fusion跑数据集)胎教级教程

硬件环境&#xff1a;NVIDIA Jeston Orin nx 系统&#xff1a;Ubuntu 20.04 任务&#xff1a;跑通 EuRoC MAV Dataset 数据集 展示结果&#xff1a; 编译Vins Fusion 创建工作空间vins_ws # 创建目录结构 mkdir -p ~/vins_ws/srccd ~/vins_ws/src# 初始化工作空间&#xf…

如何安装并使用RustDesk

参考&#xff1a; 搭建 RustDesk Server&#xff1a;打造属于自己的远程控制系统&#xff0c;替代 TeamViewer 和 ToDesk&#xff01; 向日葵、ToDesk再见&#xff01;自己动手&#xff0c;自建RustDesk远程服务器真香&#xff01; 通俗易懂&#xff1a;RustDesk Server的搭…

机器学习——随机森林算法

随机森林算法是一种强大的树集成算法&#xff0c;比使用单个决策树效果要好得多。 以下是生成树集成的方法&#xff1a;假设有一个大小为m的训练集&#xff0c;然后对于b1到B&#xff0c;所以执行B次&#xff0c;可以使用有放回抽样来创建一个大小为m的训练集。所以如果有10个…

【从零学习JVM|第二篇】字节码文件

前言&#xff1a; 通过了解字节码文件可以帮助我们更容易的理解JVM的工作原理&#xff0c;所以接下来&#xff0c;我们来介绍一下字节码文件。 目录 前言&#xff1a; 正确的打开字节码文件 字节码文件组成 1. 魔数&#xff08;Magic Number&#xff09; 2. 版本号&…

Fractal Generative Models论文阅读笔记与代码分析

何恺明分型模型这篇文章在二月底上传到arXiv预出版网站到现在已经过了三个月&#xff0c;当时我也听说这篇文章时感觉是大有可为&#xff0c;但是几个月不知道忙啥了&#xff0c;可能错过很多机会&#xff0c;但是亡羊补牢嘛&#xff0c;而且截至目前&#xff0c;该文章应该也还…

OGG-01635 OGG-15149 centos服务器远程抽取AIX oracle11.2.0.4版本

背景描述 有一套ogg远程抽取的环境&#xff0c;源端是AIX7.1环境的oracle 11.2.0.4版本的数据库&#xff0c;中间是OGG抽取服务器&#xff0c;目标端是centos 7.9环境的oracle 19c。 采用集成模式远程抽取源端数据正常&#xff0c;但是经典模式远程抽取源数据的时候抽取进程启…

Kotlin REPL初探

文章目录 1. Kotlin REPL 简介2. 在命令行中玩Kotlin REPL2.1 下载Kotlin编译器压缩包2.2 安装配置Kotlin编译器2.3 启动Kotlin交互式环境2.4 在命令行玩Kotlin REPL 3. 在IDEA里玩Kotlin REPL3.1 打开Kotlin REPL窗口3.2 在Kotlin REPL窗口玩代码 4. Kotlin REPL 的优势 1. Ko…

git引用概念(git reference,git ref)(简化对复杂SHA-1哈希值的管理)(分支引用、标签引用、HEAD引用、远程引用、特殊引用)

文章目录 **引用的本质**1. **引用是文件**2. **引用的简化作用** **引用的类型**1. **分支引用&#xff08;Branch References&#xff09;**2. **标签引用&#xff08;Tag References&#xff09;**3. **HEAD 引用**4. **远程引用&#xff08;Remote References&#xff09;*…

Github 2025-06-07 Rust开源项目日报Top10

根据Github Trendings的统计,今日(2025-06-07统计)共有10个项目上榜。根据开发语言中项目的数量,汇总情况如下: 开发语言项目数量Rust项目10Dart项目1TypeScript项目1RustDesk: 用Rust编写的开源远程桌面软件 创建周期:1218 天开发语言:Rust, Dart协议类型:GNU Affero Ge…

自动化立体仓库堆垛机控制系统STEP7 OB1功能块

1、堆垛机控制系统STEP7硬件组态如下图 CPU CPU 314C-2 PN/DP 6ES7 314-6EH04-0AB0 SM 338 POS-INPUT AO2x12Bit 6ES7 332-5HB01-0AB0 2、堆垛机控制系统STEP7内部变量 前进HMI M 0.0 BOOL 后退HMI M 0.1 BOOL 上升HMI M 0.2 B…

MATLAB生成大规模无线通信网络拓扑(任意节点数量)

功能&#xff1a; 生成任意节点数量的网络拓扑&#xff0c;符合现实世界节点空间分布和连接规律 效果&#xff1a; 30节点&#xff1a; 100节点&#xff1a; 500节点&#xff1a; 程序&#xff1a; %创建时间&#xff1a;2025年6月8日 %zhouzhichao %自然生长出n节点的网络% …

ubuntu 20.04挂载固态硬盘

我们有个工控机&#xff0c;其操作系统是ubuntu 20.04。可以接入一个固态硬盘。将固态硬盘插好后&#xff0c;就要进行挂载。在AI的指导下&#xff0c;过程并不顺利。记录如下&#xff1a; 1、检查硬盘是否被识别 安装好硬盘后&#xff0c;运行以下命令来检查Linux系统是否…

PC与Windows远程连接与串流:方案简介(ZeroTier + Parsec、Moonlight + Sunshine、网易UU远程)

简介 在远程办公、云游戏、家用 NAS 串流、图形远程渲染等需求增长的背景下&#xff0c;越来越多用户开始寻找低延迟、高画质、跨网络可用的远程连接方案。今天这篇文章将深度分析三种目前在玩家圈和远程办公中都非常流行的组合方案&#xff1a; &#x1f7e2; ZeroTier Pars…

SpringBoot+MySQL家政服务平台 设计开发

概述 基于SpringBootMySQL开发的家政服务平台完整项目&#xff0c;该系统实现了用户预约、服务管理、订单统计等核心功能&#xff0c;采用主流技术栈开发&#xff0c;代码规范且易于二次开发。 主要内容 系统功能架构 本系统采用前后端分离架构&#xff0c;前端提供用户交互…

浏览器兼容-polyfill-本地服务-优化

babel和webpack结合 npx babel src --out-dir dist --presetsbabel/preset-env 这是把src下面的东西都用babel转化一下 webpack可以和babel结合使用&#xff0c;首先下载一个这东西&#xff1a; npm install babel-loader -D webpack配置&#xff1a; const path requir…

c++ decltype关键字

decltype为类型推导关键字。 示例代码&#xff1a; // decltype也可用于函数模板编程: template<typename T, typename U> auto add(T t, U u) -> decltype(t u) {return t u; }// decltype推导函数返回类型 auto doubleNumFunc(int x) -> decltype(x * 2) {ret…

Selenium自动化测试工具安装和使用(PyCharm)

一&#xff0c;了解驱动 手工测试我们很了解&#xff0c;假设我要测试百度首页是否正常&#xff0c;只需要鼠标点击打开浏览器&#xff0c;然后输入百度网址即可 但是对于程序来说&#xff0c;打开浏览器&#xff0c;需要用到对应的驱动&#xff0c;就好比你给电脑装了个外置…