Python爬虫实战:用Requests+Pandas批量抓取东方财富网全板块股票数据(附完整源码)
Python爬虫实战构建东方财富网股票数据自动化采集系统在金融数据分析领域获取全面、准确的股票市场数据是量化交易、投资研究和市场监控的基础。对于Python开发者而言如何高效地从东方财富网这类金融门户批量获取全板块股票数据并将其转化为结构化的分析素材是一项极具实用价值的技能。本文将带你从零构建一个工程化的数据采集系统不仅实现单次抓取更注重代码的模块化设计、异常处理和长期维护性满足定期更新数据集的需求。1. 系统架构设计与核心模块一个健壮的爬虫系统应当遵循高内聚低耦合的设计原则。我们将整个项目分解为以下核心模块# 项目目录结构 eastmoney_crawler/ ├── config.py # 配置文件请求头、URL模板等 ├── utils.py # 工具函数网络请求、数据处理 ├── parser.py # 数据解析器 ├── storage.py # 数据存储模块 ├── scheduler.py # 任务调度器 └── main.py # 主程序入口这种模块化设计带来的优势包括可维护性每个模块功能明确修改不影响其他部分可扩展性新增板块或字段只需修改对应模块容错能力单个模块失败不会导致整个系统崩溃2. 深度解析东方财富网API接口东方财富网的股票数据通过动态API接口提供我们需要先理解其请求参数规律2.1 关键请求参数分析通过浏览器开发者工具抓包分析可以发现核心参数包括参数名示例值说明pn1页码pz20每页数据量fidf3数据类别标识fsm:0t:6市场板块筛选条件fieldsf1,f2返回字段列表特别值得注意的是fs参数它通过特定编码表示不同市场板块# 板块编码映射字典 MARKET_CODES { 沪深京A股: m:0t:6,m:0t:80,m:1t:2,m:1t:23,m:0t:81s:2048, 上证A股: m:1t:2,m:1t:23, 创业板: m:0t:80, 科创板: m:1t:23, # 其他板块... }2.2 处理JSONP响应数据东方财富网API返回的是JSONP格式数据需要特殊处理def parse_jsonp(jsonp_str): 解析JSONP字符串为Python字典 :param jsonp_str: jQuery112...({...}) :return: dict try: # 去除回调函数包裹 json_str re.sub(r^.*?\(|\);$, , jsonp_str) return json.loads(json_str) except (AttributeError, json.JSONDecodeError) as e: raise ValueError(fJSONP解析失败: {str(e)})注意实际项目中应该添加更完善的错误处理和日志记录而非简单打印异常。3. 工程化实现细节3.1 网络请求的稳健性设计使用requests.Session()保持会话并实现自动重试机制class EastMoneyDownloader: def __init__(self): self.session requests.Session() self.session.headers.update(DEFAULT_HEADERS) self.retry_max 3 self.timeout 15 def fetch_data(self, url, paramsNone): for attempt in range(self.retry_max): try: resp self.session.get(url, paramsparams, timeoutself.timeout) resp.raise_for_status() return resp.text except (requests.RequestException, TimeoutError) as e: if attempt self.retry_max - 1: raise time.sleep(2 ** attempt)3.2 分页控制与终止条件通过分析API行为我们发现当请求超出最大页码时返回的data字段为nulldef crawl_market_stocks(market_code, max_pages1000): stocks [] for page in range(1, max_pages 1): data fetch_market_page(market_code, page) if data[data] is None: # 终止条件 break stocks.extend(parse_stock_items(data[data][diff])) time.sleep(1) # 礼貌性延迟 return pd.DataFrame(stocks)3.3 数据存储优化相比原文中直接保存多个Excel文件我们采用更专业的存储方案def save_to_database(df, market_name): 将数据保存到SQLite数据库 with sqlite3.connect(stocks.db) as conn: df.to_sql( namestock_data, conconn, if_existsappend, indexFalse, dtype{ 代码: TEXT PRIMARY KEY, 名称: TEXT, 最新价: REAL, # 其他字段... } )或者按日期分文件存储def save_to_parquet(df, market_name): 保存为Parquet格式节省空间且支持分区 date_str datetime.now().strftime(%Y%m%d) path fdata/{date_str}/{market_name}.parquet df.to_parquet(path, enginepyarrow)4. 高级功能扩展4.1 增量采集与数据更新对于定期运行的任务实现增量采集能大幅提高效率def get_existing_codes(market_name): 获取已存储的股票代码 try: existing_df pd.read_parquet(fdata/latest/{market_name}.parquet) return set(existing_df[代码]) except FileNotFoundError: return set() def incremental_update(market_code, market_name): existing_codes get_existing_codes(market_name) new_stocks [] for page in range(1, 1000): data fetch_market_page(market_code, page) if not data[data]: break for item in data[data][diff]: if item[f12] not in existing_codes: new_stocks.append(parse_stock_item(item)) return pd.DataFrame(new_stocks)4.2 代理IP与反反爬策略针对可能出现的反爬机制我们可以集成代理IP池class ProxyManager: def __init__(self, proxy_list): self.proxies cycle(proxy_list) def get_proxy(self): return next(self.proxies) # 在下载器中集成 def fetch_with_proxy(self, url, paramsNone): proxy self.proxy_manager.get_proxy() try: resp self.session.get(url, paramsparams, proxies{http: proxy, https: proxy}, timeoutself.timeout) return resp.text except: self.blacklist_proxy(proxy) raise4.3 监控与报警系统添加简单的运行监控def send_alert(message): 通过邮件或Webhook发送报警 pass def monitor_job(): start_time time.time() try: main() except Exception as e: send_alert(f爬虫失败: {str(e)}) finally: duration time.time() - start_time if duration 3600: # 超过1小时 send_alert(f爬虫运行时间异常: {duration:.2f}秒)5. 完整系统集成将所有模块组合成可配置的爬虫系统# config.py CONFIG { markets: { 沪深京A股: f3fsm:0t:6,m:0t:80,m:1t:2,m:1t:23,m:0t:81s:2048, # 其他市场配置... }, output_format: parquet, # 或 database concurrency: 3, # 并发线程数 request_delay: 1, # 请求间隔 } # main.py def main(): config load_config() downloader EastMoneyDownloader( retry_maxconfig[retry_max], proxiesconfig.get(proxies) ) with ThreadPoolExecutor(max_workersconfig[concurrency]) as executor: futures [] for market_name, market_code in config[markets].items(): future executor.submit( crawl_market, downloader, market_code, market_name, config ) futures.append(future) for future in as_completed(futures): try: result future.result() save_result(result, config[output_format]) except Exception as e: logging.error(f任务失败: {str(e)})这套系统不仅解决了单次采集的需求更提供了定时任务支持可集成到Airflow等调度系统分布式扩展能力通过Redis实现任务队列数据质量检查采集完成后自动验证数据完整性可视化监控通过Grafana展示采集指标对于想要进一步优化的开发者可以考虑使用异步IOaiohttp提高IO密集型任务的效率添加数据清洗管道处理异常值和缺失数据实现自动化测试确保代码变更不会破坏现有功能使用Docker容器化部署方便环境管理
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2476969.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!