项目架构:
FastAPI(folder)
>app(folder)
>core(folder)
>models(folder)
>routers(folder)
>utils(folder)
main.py(file)
1 utils文件夹下新建schedulers.py
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
#特殊说明:此处显示指定DB,会在DB里创建数据库apscheduler_db
jobstores={
'default':MongoDBJobStore(
database='apscheduler_db',
collection='custom_jobs',
host='localhost',
port=27017
)
}
#特殊说明:replace_existing=True会覆盖同名的JOB,但不影响数据库中的,仅处理job_id相同的冲突
scheduler=BackgroundScheduler(jobstores=jobstores,replace_existing=True)
2 main.py中在lifespan上下文初始化和关闭scheduler
import uvicorn
from contextlib import asynccontextmanager
from app.utils.schedulers import jobstores
scheduler=None
#特殊说明:yield中可以监控到正常结束比如ctrl+c,异常结束不能执行yield后代码
@asynccontextmanager
async def lifespan(app:FastAPI):
jobstores['default'].remove_all_jobs()
from app.utils.schedulers import scheduler
yield
scheduler.remove_all_jobs()
scheduler.shutdown(wait=False)
app=FastAPI(lifespan=lifespan)
3 models文件夹新建scheduler.py文件配置基础参数类和默认值
from pydantic import BaseModel
class job_config(BaseModel):
job_id:str="default"
job_name:str="default"
trigger_type:str="interval"
trigger_kwargs:dict={}
seconds:int=30
pass
4 api文件夹下添加sechedulers.py配置添加创建job方法
from app.utils.schedulers import scheduler
from app.models.schedulers import job_config
from fastapi import APIRouter
from typing import Coroutine,Callable
from datetime import datetime
@router.get("create_job")
def create_job(jobconfig,func):
try:
if jobconfig is None:
jobconfig=job_config()
scheduler.add_job(
func,
trigger=jobconfig.trigger_type,
kwargs=jobconfig.trigger_kwargs,#特殊说明:这里可以添加自定义参数
id=jobconfig.job_id,
name=jobconfig.job_name,
seconds=jobconfig.seconds
)
scheduler.start()
except Exception as e:
raise e
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
5 测试,调用test方法
@router.get("/function")
def function1():
try:
with open("D:\\demo.txt", "a") as file:
print("写入文件"+ str(datetime.now()), file=file)
except:
pass
@router.get("/test")
def test():
try:
create_job(None,function1)
except:
pass