Python+Selenium实现抖音博主批量监控:300+账号实时更新通知(附完整代码)
PythonSelenium构建高可用抖音博主监控系统从零到一的实战架构与性能调优最近在技术社群里经常看到有朋友在讨论如何批量追踪抖音博主的更新动态。无论是做内容分析、竞品研究还是个人兴趣追踪手动刷新几百个主页显然不现实。市面上虽然有一些现成的工具但要么功能受限要么价格不菲更重要的是缺乏透明度和自定义能力。作为一个长期和爬虫、自动化打交道的开发者我更倾向于自己动手搭建一套可控的监控系统。今天我想分享的就是如何用PythonSelenium为核心构建一个能够稳定监控300抖音博主更新并实现实时通知的完整解决方案。这套方案不仅仅是代码的堆砌更涉及到架构设计、异常处理、性能优化等多个层面的思考希望能给有类似需求的技术爱好者提供一个扎实的起点。1. 系统架构设计与核心思路在动手写代码之前清晰的架构设计能避免后期大量的重构工作。我们的目标系统需要具备几个核心能力身份认证与状态维持、博主列表的动态管理、高效且低侵入的轮询检测、可靠的通知与下载机制以及完善的错误恢复与日志记录。1.1 整体工作流设计整个系统的工作流可以抽象为一个闭环的生产者-消费者模型但这里我们采用更直接的事件驱动轮询模型。graph TD A[系统启动] -- B{首次运行?}; B -- 是 -- C[登录并获取完整关注列表]; B -- 否 -- D[加载本地缓存的博主列表]; C -- E[保存列表至本地JSON]; D -- F; E -- F[进入主轮询循环]; F -- G[遍历博主列表]; G -- H[请求博主主页API]; H -- I{作品数有变化?}; I -- 无变化 -- G; I -- 增加 -- J[获取作品详情并校验]; J -- K{符合过滤条件?}; K -- 是 -- L[触发通知 下载任务]; K -- 否 -- M[仅更新本地作品数]; L -- N[更新本地记录]; M -- N; N -- O[等待间隔继续下一轮];这个流程的关键在于状态对比。我们并不需要实时解析每个博主的全部视频列表而是通过对比前后两次查询到的“作品总数”这个轻量级指标来判断是否有更新。只有当总数增加时才触发更详细的查询和过滤流程这极大地减少了网络请求和数据处理的开销。1.2 技术栈选型与考量Selenium: 主要用于初始登录和Cookie获取。抖音的Web端登录涉及复杂的验证直接模拟请求难度极大。Selenium驱动真实浏览器可以完美解决这个问题获取到登录后的有效Cookie供后续的requests库使用。Requests: 作为主力HTTP客户端。一旦获得有效Cookie后续所有的轮询请求都应使用requests发起。它比Selenium轻量无数倍速度更快资源占用极低是批量轮询的理想选择。状态存储: 使用本地JSON文件来存储博主列表、作品数等状态信息。虽然数据库如SQLite更强大但对于这个场景JSON文件简单、直观、无需额外服务更符合轻量化的需求。文件结构设计如下{ last_update: 2023-10-27T10:30:00, following_count: 356, monitoring_list: [ { sec_uid: MS4wLjABAAAAxxxxx, uid: 123456789, nickname: 博主A, aweme_count: 150, last_checked: 2023-10-27T10:25:00 }, // ... 更多博主 ] }通知模块: 根据运行环境灵活选择。在Windows桌面环境可以使用win10toast显示系统通知在服务器环境则可以集成邮件smtplib、Telegram Bot、企业微信/钉钉机器人等。任务调度: 核心是简单的while循环配合time.sleep。对于更复杂的定时、并发需求可以考虑引入schedule库或APScheduler。注意任何自动化工具的使用都必须遵守目标网站的robots.txt协议和服务条款。过度频繁的请求会对服务器造成压力可能导致IP被限制。务必在代码中设置合理的请求间隔并考虑使用代理池等策略来分散请求。2. 核心模块实现详解有了清晰的架构我们来逐一拆解各个核心模块的实现。我会提供经过重构和优化的代码片段并解释其中的关键点。2.1 身份认证与Cookie管理这是整个系统的钥匙。我们的策略是“一次登录长期复用”。使用Selenium完成人工登录然后将关键的Cookie持久化保存。# auth_manager.py import json import time from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.by import By import logging logger logging.getLogger(__name__) class DouyinAuthManager: COOKIE_FILE douyin_cookies.json def __init__(self, headlessFalse): 初始化认证管理器 :param headless: 是否使用无头模式服务器环境建议True self.chrome_options Options() if headless: self.chrome_options.add_argument(--headless) self.chrome_options.add_argument(--disable-gpu) self.chrome_options.add_argument(--no-sandbox) self.chrome_options.add_argument(--disable-dev-shm-usage) # 防止被检测为自动化工具 self.chrome_options.add_experimental_option(excludeSwitches, [enable-automation]) self.chrome_options.add_experimental_option(useAutomationExtension, False) self.chrome_options.add_argument(--disable-blink-featuresAutomationControlled) def login_and_save_cookie(self): 执行登录流程并保存Cookie driver webdriver.Chrome(optionsself.chrome_options) driver.get(https://www.douyin.com) wait WebDriverWait(driver, 30) logger.info(请在弹出的浏览器窗口中手动扫码或登录抖音...) # 等待登录成功通常可以通过页面元素判断例如用户头像出现 try: wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, [data-e2euser-avatar]))) logger.info(登录成功检测到用户头像。) except Exception as e: logger.error(登录超时或未检测到成功标志。) driver.quit() raise e # 获取并保存Cookie cookies driver.get_cookies() with open(self.COOKIE_FILE, w, encodingutf-8) as f: json.dump(cookies, f, ensure_asciiFalse, indent2) logger.info(fCookie已保存至 {self.COOKIE_FILE}) # 顺便获取一次用户关键信息如sec_uid验证Cookie有效性 user_info self._extract_user_info_from_page(driver.page_source) driver.quit() return user_info def load_cookie_to_requests(self): 将保存的Cookie加载到requests的session中 import requests session requests.Session() try: with open(self.COOKIE_FILE, r, encodingutf-8) as f: cookies json.load(f) for cookie in cookies: session.cookies.set(cookie[name], cookie[value]) logger.info(Cookie已加载到session。) except FileNotFoundError: logger.warning(未找到Cookie文件需要先登录。) return None return session def _extract_user_info_from_page(self, page_source): 从页面源码中提取用户sec_uid等信息示例实际需分析页面结构 # 这里需要根据抖音Web端实际的数据渲染方式调整 # 可能是从window._SSR_HYDRATED_DATA或某个script标签中提取 import re import json as json_lib # 示例正则实际模式需自行抓取分析 pattern rwindow\._SSR_HYDRATED_DATA\s*\s*({.*?}); match re.search(pattern, page_source, re.DOTALL) if match: try: data json_lib.loads(match.group(1)) sec_uid data.get(user, {}).get(secUid) return {sec_uid: sec_uid} except Exception as e: logger.error(f解析用户信息失败: {e}) return None这个类的设计将认证逻辑封装起来提供了login_and_save_cookie首次或Cookie失效时调用和load_cookie_to_requests日常运行时调用两个清晰的方法。2.2 博主列表的动态获取与维护系统需要监控的博主列表可能变化新增关注或取关。我们需要一个能同步线上关注列表与本地监控列表的机制。# list_manager.py import json import time import logging from typing import List, Dict, Optional logger logging.getLogger(__name__) class MonitoringListManager: def __init__(self, data_filemonitoring_data.json): self.data_file data_file self.data self._load_data() def _load_data(self) - Dict: 加载本地监控数据 try: with open(self.data_file, r, encodingutf-8) as f: return json.load(f) except FileNotFoundError: # 初始化数据结构 return { meta: {last_full_sync: None, version: 1}, monitoring_list: [], statistics: {total_tracked: 0} } def sync_from_live_following(self, live_following_list: List[Dict], session): 从线上关注列表同步到本地监控列表 :param live_following_list: 从API获取的实时关注列表 :param session: 带Cookie的requests session local_list {item[sec_uid]: item for item in self.data.get(monitoring_list, [])} live_dict {item[sec_uid]: item for item in live_following_list} added [] removed [] # 检查新增关注 for sec_uid, live_info in live_dict.items(): if sec_uid not in local_list: # 新关注的博主获取其详细信息并加入监控 detail self._fetch_user_detail(sec_uid, session) if detail: local_list[sec_uid] detail added.append(detail[nickname]) else: logger.warning(f无法获取博主 {sec_uid} 的详细信息跳过。) # 检查取消关注可选根据需求决定是否从监控列表移除 # for sec_uid, local_info in local_list.items(): # if sec_uid not in live_dict: # removed.append(local_info[nickname]) # del local_list[sec_uid] # 更新本地数据 self.data[monitoring_list] list(local_list.values()) self.data[meta][last_full_sync] time.strftime(%Y-%m-%d %H:%M:%S) self.data[statistics][total_tracked] len(local_list) self._save_data() if added: logger.info(f同步完成。新增监控博主: {, .join(added)}) # if removed: # logger.info(f移出监控博主: {, .join(removed)}) logger.info(f当前总计监控博主数: {len(local_list)}) def _fetch_user_detail(self, sec_uid: str, session) - Optional[Dict]: 根据sec_uid获取博主的详细信息昵称、作品数等 # 这里需要调用抖音的用户信息API # 示例URL: fhttps://www.douyin.com/aweme/v1/web/user/profile/other/?sec_uid{sec_uid} api_url fhttps://www.douyin.com/aweme/v1/web/user/profile/other/ params {sec_uid: sec_uid} headers { User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36, Referer: fhttps://www.douyin.com/user/{sec_uid}, } try: resp session.get(api_url, paramsparams, headersheaders, timeout10) resp.raise_for_status() data resp.json() user_info data.get(user, {}) return { sec_uid: sec_uid, uid: user_info.get(uid), nickname: user_info.get(nickname), aweme_count: user_info.get(aweme_count, 0), last_checked: None, last_aweme_count: user_info.get(aweme_count, 0) } except Exception as e: logger.error(f获取用户 {sec_uid} 详情失败: {e}) return None def _save_data(self): 保存数据到文件 with open(self.data_file, w, encodingutf-8) as f: json.dump(self.data, f, ensure_asciiFalse, indent2)这个管理器负责核心的列表同步逻辑。它会在每次轮询前先检查线上关注列表与本地列表的差异确保监控目标始终是最新的。2.3 高效轮询与差异检测这是系统的引擎。我们需要一个稳健的循环遍历所有监控博主智能地检测更新。# poller.py import time import random import logging from typing import Dict, List from datetime import datetime logger logging.getLogger(__name__) class DouyinPoller: def __init__(self, session, list_manager, notifier, downloader): self.session session self.list_manager list_manager self.notifier notifier self.downloader downloader self.request_interval (3, 7) # 请求间隔随机范围秒 self.poll_cycle_interval 300 # 每轮完整循环后的间隔秒 def start_polling(self): 启动主轮询循环 logger.info(抖音博主监控轮询器启动。) cycle_count 0 while True: cycle_count 1 cycle_start time.time() logger.info(f 开始第 {cycle_count} 轮轮询 ) monitoring_list self.list_manager.data[monitoring_list] total len(monitoring_list) for idx, creator in enumerate(monitoring_list, 1): self._check_single_creator(creator, idx, total) # 在检查每个博主后等待一个随机时间避免请求过于规律 if idx total: time.sleep(random.uniform(*self.request_interval)) cycle_duration time.time() - cycle_start logger.info(f第 {cycle_count} 轮轮询结束耗时 {cycle_duration:.2f} 秒。) # 等待一段时间后开始下一轮 if cycle_duration self.poll_cycle_interval: sleep_time self.poll_cycle_interval - cycle_duration logger.info(f等待 {sleep_time:.2f} 秒后开始下一轮。) time.sleep(sleep_time) def _check_single_creator(self, creator: Dict, current_idx: int, total: int): 检查单个博主的更新情况 sec_uid creator[sec_uid] nickname creator.get(nickname, sec_uid) last_known_count creator.get(last_aweme_count, 0) logger.debug(f[{current_idx}/{total}] 检查博主: {nickname}) try: current_count self._fetch_aweme_count(sec_uid) except Exception as e: logger.error(f获取博主 {nickname} 作品数失败: {e}) # 可以在这里记录失败次数达到阈值后暂时跳过或报警 return if current_count is None: return if current_count last_known_count: logger.info(f 发现更新博主 [{nickname}] 作品数从 {last_known_count} 变为 {current_count}) new_awemes self._fetch_new_awemes(sec_uid, last_known_count, current_count) if new_awemes: for aweme in new_awemes: if self._filter_aweme(aweme): # 应用过滤规则 self.notifier.send(f{nickname} 发布了新视频: {aweme.get(desc, 无标题)}) self.downloader.download(aweme) else: logger.debug(f视频 {aweme.get(aweme_id)} 不符合过滤条件已跳过。) # 更新本地记录 creator[last_aweme_count] current_count creator[last_checked] datetime.now().isoformat() self.list_manager._save_data() elif current_count last_known_count: logger.info(f博主 [{nickname}] 作品数减少可能删除了作品更新记录。) creator[last_aweme_count] current_count self.list_manager._save_data() else: # 作品数无变化 creator[last_checked] datetime.now().isoformat() # 为了减少IO可以累积多次无变化后再统一保存 pass def _fetch_aweme_count(self, sec_uid: str) - Optional[int]: 获取博主的作品总数 # 实现同 list_manager._fetch_user_detail 中的作品数获取部分 # 为了效率可以只请求一个轻量级的API端点只返回作品数 pass def _fetch_new_awemes(self, sec_uid: str, old_count: int, new_count: int) - List[Dict]: 获取新增的作品列表 # 计算需要获取的新作品数量 count_to_fetch new_count - old_count # 调用抖音的作品列表API获取最新的N条作品 # 返回作品信息列表 pass def _filter_aweme(self, aweme: Dict) - bool: 过滤作品根据时长、类型、竖屏等规则 # 示例过滤规则 duration aweme.get(duration, 0) # 单位毫秒 is_vertical aweme.get(ratio, ) 9:16 # 假设比例信息 aweme_type aweme.get(aweme_type, 0) # 0可能代表普通视频 # 规则时长在15秒到5分钟之间竖屏视频类型为普通视频 if 15000 duration 300000 and is_vertical and aweme_type 0: return True return False轮询器的核心是_check_single_creator方法。它遵循“先轻后重”的原则先获取作品总数进行对比只有发现数量增加时才去获取具体的作品详情并进行过滤这能节省大量不必要的请求。3. 性能优化与稳定性保障当监控列表膨胀到几百甚至上千时性能、稳定性和合规性就成为重中之重。这里有几个我实践中总结的关键点。3.1 请求优化与速率限制无节制的请求是封号封IP的捷径。必须实施严格的速率控制和智能间隔。随机化间隔: 在请求间插入随机等待时间如3-7秒模拟人类操作的不规律性。分批次处理: 将几百个博主分成多个批次批次间有更长的休息时间。例如每检查50个博主休息2分钟。错误退避: 当请求失败如遇到429 Too Many Requests时采用指数退避算法增加等待时间。def request_with_backoff(self, url, max_retries5): retry_delay 1 for i in range(max_retries): try: return self.session.get(url, timeout10) except requests.exceptions.RequestException as e: if hasattr(e.response, status_code) and e.response.status_code 429: logger.warning(f触发速率限制等待 {retry_delay} 秒后重试 ({i1}/{max_retries})) time.sleep(retry_delay) retry_delay * 2 # 指数退避 else: raise e raise Exception(达到最大重试次数请求失败。)利用缓存: 对于博主的基本信息如昵称如果不是频繁变化可以缓存在内存或本地避免重复请求。3.2 异常处理与日志记录健壮的系统必须能妥善处理各种异常并留下清晰的日志供排查。结构化日志: 使用logging模块配置不同级别DEBUG, INFO, WARNING, ERROR的输出并输出到文件。import logging logging.basicConfig( levellogging.INFO, format%(asctime)s - %(name)s - %(levelname)s - %(message)s, handlers[ logging.FileHandler(monitor.log, encodingutf-8), logging.StreamHandler() ] )关键异常捕获: 对网络请求、JSON解析、文件IO等可能出错的地方进行精细化的异常捕获和记录。try: current_count self._fetch_aweme_count(sec_uid) except requests.exceptions.Timeout: logger.error(f请求博主 {nickname} 超时可能网络不稳定。) self._record_failure(sec_uid, timeout) return except json.JSONDecodeError as e: logger.error(f解析博主 {nickname} 的返回数据失败: {e}) return except Exception as e: logger.exception(f检查博主 {nickname} 时发生未知错误: {e}) # 记录完整堆栈 return状态持久化与恢复: 定期如每处理完10个博主保存一次监控列表的状态。这样即使程序意外崩溃重启后也能从最近的状态继续而不是从头开始。3.3 通知与下载模块的异步化通知发送和视频下载可能是耗时的I/O操作。如果放在主轮询线程中同步执行会严重拖慢整个监控循环。解决方案是引入消息队列或异步任务。最简单的实现可以使用Python的threading模块或concurrent.futures线程池。# async_dispatcher.py import threading import queue import logging logger logging.getLogger(__name__) class AsyncTaskDispatcher: def __init__(self, max_workers5): self.task_queue queue.Queue() self.max_workers max_workers self.workers [] self._stop_event threading.Event() def start(self): 启动工作线程 for i in range(self.max_workers): worker threading.Thread(targetself._worker_loop, namefWorker-{i}, daemonTrue) worker.start() self.workers.append(worker) logger.info(f异步任务分发器已启动{self.max_workers} 个工作线程。) def submit_task(self, task_type, **kwargs): 提交一个任务到队列 self.task_queue.put((task_type, kwargs)) logger.debug(f新任务已提交: {task_type}) def _worker_loop(self): 工作线程的主循环 while not self._stop_event.is_set(): try: task_type, kwargs self.task_queue.get(timeout1) try: if task_type notify: self._do_notify(**kwargs) elif task_type download: self._do_download(**kwargs) except Exception as e: logger.error(f处理任务 {task_type} 失败: {e}) finally: self.task_queue.task_done() except queue.Empty: continue def _do_notify(self, message, **kwargs): # 实际调用通知模块 pass def _do_download(self, aweme_info, **kwargs): # 实际调用下载模块 pass def stop(self): 停止分发器 self._stop_event.set() for worker in self.workers: worker.join() logger.info(异步任务分发器已停止。) # 在主程序中 dispatcher AsyncTaskDispatcher(max_workers3) dispatcher.start() # 当发现新视频时 dispatcher.submit_task(notify, messagef{nickname} 发布了新视频) dispatcher.submit_task(download, aweme_infonew_aweme)这样主轮询线程只需要将“发现新视频”这个事件丢到队列里就可以立刻继续检查下一个博主实现了检测与处理的解耦大大提升了系统的响应速度。4. 部署与运维实践开发完成只是第一步让系统7x24小时稳定运行才是真正的挑战。4.1 本地与服务器部署选择本地运行Windows/macOS:优点: 调试方便可以利用桌面通知。缺点: 电脑需常开网络不稳定可能中断。建议: 适合监控博主数量不多100的个人用户。可以使用系统自带的**任务计划程序Windows或launchd/cronmacOS**在开机时启动脚本并配合while True循环和异常重启机制。服务器运行Linux:优点: 稳定、持久网络条件好。缺点: 需要一定的Linux运维知识无图形界面。建议: 这是生产环境的首选。使用systemd或Supervisor来管理进程实现开机自启、自动重启、日志轮转。; Supervisor 配置示例 (douyin-monitor.conf) [program:douyin-monitor] command/usr/bin/python3 /path/to/your/main.py directory/path/to/your/ useryour_username autostarttrue autorestarttrue startretries3 stderr_logfile/var/log/douyin-monitor.err.log stdout_logfile/var/log/douyin-monitor.out.log4.2 监控与告警系统本身是监控者也需要被监控。心跳检测: 让脚本定期如每小时向一个健康检查端点发送信号或者在一个固定位置更新状态文件。另一个监控脚本可以检查这个心跳是否正常。关键指标日志: 在日志中记录每轮轮询的耗时、检查的博主数量、发现的更新数量、请求失败率等。便于后期分析性能瓶颈。外部告警: 如果脚本崩溃或长时间没有新视频这可能意味着轮询已停止可以通过邮件、Server酱、Bark等渠道给自己发送告警信息。4.3 长期维护与迭代Cookie失效处理: Cookie会过期。可以在代码中检测“未登录”状态如请求返回登录页然后触发重新登录流程并更新Cookie文件。对于服务器这可能需要结合无头浏览器或手动干预。API变更应对: 抖音的接口可能会变化。将API URL、请求头、数据解析逻辑集中放在配置类或单独模块中一旦变化只需修改一处。数据备份: 定期备份本地的monitoring_data.json文件防止数据丢失。功能扩展点:多账号监控: 改造auth_manager支持多个账号的Cookie轮换使用分散请求压力。更丰富的过滤规则: 基于视频标题关键词、话题标签、发布时间段等进行过滤。数据分析: 将抓取到的视频信息发布时间、点赞、评论等存入数据库进行简单的趋势分析。构建这样一个系统更像是在和平台进行一场谨慎的“对话”。你需要理解它的规则尊重它的边界然后用技术优雅地实现自己的需求。代码本身会不断迭代但其中关于效率、稳定性和可维护性的思考才是最有价值的部分。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2420952.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!