FastAPI 安全认证
FastAPI 安全认证学习笔记一、认证流程概览FastAPI 的认证通常遵循以下流程客户端发送请求携带凭证如 Token、Cookie。中间件/依赖拦截请求提取凭证。验证逻辑校验凭证有效性如 JWT 签名、密码比对。注入用户将用户信息注入到request.state.user或作为参数传递。路由处理基于用户信息执行业务逻辑。二、OAuth2 with Password Flow (JWT)这是 FastAPI 官方推荐的标准认证方式基于OAuth2 密码流和JWT (JSON Web Token)。1. 核心依赖pipinstallpython-jose[cryptography]passlib[bcrypt]python-jose: 用于生成和验证 JWT。passlib: 用于密码哈希bcrypt。2. 完整实现示例A. 配置与工具函数fromdatetimeimportdatetime,timedeltafromtypingimportOptionalfromjoseimportJWTError,jwtfrompasslib.contextimportCryptContextfromfastapiimportDepends,HTTPException,statusfromfastapi.securityimportOAuth2PasswordBearer,OAuth2PasswordRequestFormfrompydanticimportBaseModel# ---- 配置 ----SECRET_KEYyour-secret-key# 生产环境应从环境变量读取ALGORITHMHS256ACCESS_TOKEN_EXPIRE_MINUTES30# ---- 密码哈希 ----pwd_contextCryptContext(schemes[bcrypt],deprecatedauto)oauth2_schemeOAuth2PasswordBearer(tokenUrltoken)# ---- 模拟数据库 ----fake_users_db{johndoe:{username:johndoe,full_name:John Doe,email:johndoeexample.com,hashed_password:pwd_context.hash(secret),disabled:False,}}# ---- 数据模型 ----classToken(BaseModel):access_token:strtoken_type:strclassTokenData(BaseModel):username:Optional[str]NoneclassUser(BaseModel):username:stremail:Optional[str]Nonefull_name:Optional[str]Nonedisabled:Optional[bool]None# ---- 工具函数 ----defverify_password(plain_password,hashed_password):returnpwd_context.verify(plain_password,hashed_password)defget_password_hash(password):returnpwd_context.hash(password)defget_user(db,username:str):ifusernameindb:user_dictdb[username]returnUser(**user_dict)returnNonedefauthenticate_user(fake_db,username:str,password:str):userget_user(fake_db,username)ifnotuser:returnFalseifnotverify_password(password,user.hashed_password):returnFalsereturnuserdefcreate_access_token(data:dict,expires_delta:Optional[timedelta]None):to_encodedata.copy()ifexpires_delta:expiredatetime.utcnow()expires_deltaelse:expiredatetime.utcnow()timedelta(minutes15)to_encode.update({exp:expire})encoded_jwtjwt.encode(to_encode,SECRET_KEY,algorithmALGORITHM)returnencoded_jwt# ---- 依赖注入获取当前用户 ----asyncdefget_current_user(token:strDepends(oauth2_scheme)):credentials_exceptionHTTPException(status_codestatus.HTTP_401_UNAUTHORIZED,detailCould not validate credentials,headers{WWW-Authenticate:Bearer},)try:payloadjwt.decode(token,SECRET_KEY,algorithms[ALGORITHM])username:strpayload.get(sub)ifusernameisNone:raisecredentials_exception token_dataTokenData(usernameusername)exceptJWTError:raisecredentials_exception userget_user(fake_users_db,usernametoken_data.username)ifuserisNone:raisecredentials_exceptionreturnuserasyncdefget_current_active_user(current_user:UserDepends(get_current_user)):ifcurrent_user.disabled:raiseHTTPException(status_code400,detailInactive user)returncurrent_userB. 路由定义fromfastapiimportFastAPI,Depends,HTTPException,status appFastAPI()app.post(/token,response_modelToken)asyncdeflogin_for_access_token(form_data:OAuth2PasswordRequestFormDepends()):userauthenticate_user(fake_users_db,form_data.username,form_data.password)ifnotuser:raiseHTTPException(status_codestatus.HTTP_401_UNAUTHORIZED,detailIncorrect username or password,headers{WWW-Authenticate:Bearer},)access_token_expirestimedelta(minutesACCESS_TOKEN_EXPIRE_MINUTES)access_tokencreate_access_token(data{sub:user.username},expires_deltaaccess_token_expires)return{access_token:access_token,token_type:bearer}app.get(/users/me/,response_modelUser)asyncdefread_users_me(current_user:UserDepends(get_current_active_user)):returncurrent_userapp.get(/users/me/items/)asyncdefread_own_items(current_user:UserDepends(get_current_active_user)):return[{item_id:Foo,owner:current_user.username}]3. 关键点解析组件作用OAuth2PasswordBearer定义 Token 获取方式/token自动从Authorization: Bearer token提取 Token。pwd_context使用 bcrypt 算法哈希密码防止明文存储。create_access_token生成 JWT包含sub(subject, 通常是用户名) 和exp(过期时间)。get_current_user依赖函数解码 Token 并验证用户是否存在。OAuth2PasswordRequestForm自动解析application/x-www-form-urlencoded表单数据用户名/密码。三、API Key 认证适用于服务间调用或简单场景。1. 自定义 Header 提取fromfastapiimportHeader,HTTPException,statusfromtypingimportOptional API_KEYsuper-secret-api-keyasyncdefverify_api_key(x_api_key:strHeader(...)):ifx_api_key!API_KEY:raiseHTTPException(status_codestatus.HTTP_403_FORBIDDEN,detailInvalid API Key)app.get(/items/)asyncdefread_items(api_key:strDepends(verify_api_key)):return{items:[Item 1,Item 2]}2. 使用APIKeyHeader(推荐)fromfastapiimportAPIRouter,Header,HTTPException,statusfromfastapi.securityimportAPIKeyHeader api_key_headerAPIKeyHeader(nameX-API-Key,auto_errorFalse)asyncdefget_api_key(api_key:strDepends(api_key_header)):ifnotapi_key:raiseHTTPException(status_code403,detailAPI Key missing)ifapi_key!API_KEY:raiseHTTPException(status_code403,detailInvalid API Key)returnapi_keyapp.get(/secure/)asyncdefsecure_endpoint(api_key:strDepends(get_api_key)):return{msg:Access granted}四、HTTP Basic Auth适用于简单的用户名/密码认证不推荐用于生产环境除非配合 HTTPS。fromfastapi.securityimportHTTPBasic,HTTPBasicCredentialsimportsecrets securityHTTPBasic()defverify_password(username:str,password:str)-bool:# 实际项目中应查询数据库并验证哈希returnusernameadminandpasswordsecretapp.get(/users/me)asyncdefget_current_user(credentials:HTTPBasicCredentialsDepends(security)):ifnotverify_password(credentials.username,credentials.password):fromfastapiimportHTTPException,statusraiseHTTPException(status_codestatus.HTTP_401_UNAUTHORIZED,detailIncorrect email or password,headers{WWW-Authenticate:Basic},)return{username:credentials.username}五、角色权限控制 (RBAC)在用户信息中增加角色字段实现权限控制。1. 扩展用户模型classUser(BaseModel):username:strrole:str# admin, user, guest# ... 其他字段2. 创建权限依赖fromfunctoolsimportwrapsdefrequire_role(required_role:str):asyncdefrole_checker(current_user:UserDepends(get_current_active_user)):ifcurrent_user.role!required_role:raiseHTTPException(status_codestatus.HTTP_403_FORBIDDEN,detailOperation not permitted)returncurrent_userreturnrole_checkerapp.delete(/users/{user_id})asyncdefdelete_user(user_id:int,current_user:UserDepends(require_role(admin))):return{msg:fUser{user_id}deleted by{current_user.username}}六、安全最佳实践1. 密码存储必须使用强哈希算法如bcrypt,argon2。禁止明文存储密码。使用passlib库简化哈希操作。2. JWT 安全密钥强度SECRET_KEY必须足够长且随机建议 32 字节以上。过期时间设置合理的exp避免 Token 长期有效。算法使用HS256或RS256避免none算法。存储前端应将 Token 存储在HttpOnly Cookie或localStorage需防范 XSS。3. HTTPS必须在生产环境启用 HTTPS防止 Token 在传输中被窃听。使用HTTPSRedirectMiddleware强制跳转。4. 防止暴力破解限制登录尝试次数如使用slowapi限流。登录失败时返回通用错误信息“Invalid username or password”避免泄露用户是否存在。5. CORS 与认证如果allow_credentialsTrueallow_origins不能为[*]。确保 CORS 配置正确防止跨站请求伪造CSRF。七、完整项目结构建议project/ ├── app/ │ ├── __init__.py │ ├── main.py # 入口配置中间件 │ ├── config.py # 配置项 (SECRET_KEY, DB_URL) │ ├── security.py # 认证逻辑 (JWT, 密码哈希) │ ├── dependencies.py # 依赖注入 (get_current_user) │ ├── models.py # 数据模型 (User, Token) │ ├── routers/ │ │ ├── auth.py # 登录/注册路由 │ │ └── users.py # 受保护的路由 │ └── database.py # 数据库连接 ├── requirements.txt └── .env # 敏感配置八、常见问题排查问题原因解决方案401 UnauthorizedToken 无效/过期/缺失检查Authorization头格式确认 Token 未过期403 Forbidden权限不足检查用户角色确认require_role逻辑JWTDecodeError密钥错误或算法不匹配确认SECRET_KEY和ALGORITHM一致Invalid signatureToken 被篡改检查密钥是否泄露确认签名算法CORS error跨域配置错误检查allow_origins和allow_credentials九、总结FastAPI 提供了强大的安全机制核心在于依赖注入将认证逻辑解耦复用性强。OAuth2 标准支持多种认证流兼容性好。Pydantic 模型自动校验数据格式减少错误。中间件全局处理安全策略如 HTTPS、CORS。推荐实践使用JWT OAuth2 Password Flow作为主要认证方式。配合bcrypt存储密码。生产环境强制HTTPS。使用依赖注入实现细粒度权限控制。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2592937.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!