已经用Python实现对Splunk通过session id获取查询数据,现在要实现Python批量数据获取,通过一个列表中的大量Session ID,快速高效地获取一个数据表,考虑异常处理,多线程和异步操作以提高性能,同时将数据表写入MySQL表,获取数据的成功和失败的状态信息写入.log日志文本文件。
这个方案提供了灵活的可扩展性,可以根据具体业务需求调整数据处理逻辑、数据库结构和并发策略。
import logging
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue
import requests
import pymysql
from pymysql import MySQLError
from pymysql.cursors import DictCursor
# 配置日志记录
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('splunk_import.log'),
logging.StreamHandler()
]
)
# Splunk配置
SPLUNK_API_URL = "https://your.splunk.url/services/search/results"
SPLUNK_HEADERS = {
"Content-Type": "application/json",
# 其他需要的请求头
}
# MySQL配置
MYSQL_CONFIG = {
"host": "localhost",
"user": "root",
"password": "password",
"database": "splunk_data",
"charset": "utf8mb4",
"cursorclass": DictCursor
}
# 线程安全的MySQL连接池
class MySQLPool:
_lock = threading.Lock()
_pool = None
@classmethod
def get_connection(cls):
with cls._lock:
if not cls._pool:
cls._pool = pymysql.ConnectionPool(
min=2,
max=10,
**MYSQL_CONFIG
)
return cls._pool.get_connection()
# Splunk数据获取函数
def fetch_splunk_data(session_id):
try:
params = {
"output_mode": "json",
"session_id": session_id
}
response = requests.get(
SPLUNK_API_URL,
headers=SPLUNK_HEADERS,
params=params,
timeout=30
)
response.raise_for_status()
return {
"session_id": session_id,
"data": response.json(),
"success": True
}
except Exception as e:
return {
"session_id": session_id,
"error": str(e),
"success": False
}
# 数据处理和存储函数
def process_session(session_id):
result = fetch_splunk_data(session_id)
if not result['success']:
logging.error(f"Session {session_id} failed: {result.get('error')}")
return False
try:
# 解析数据(根据实际数据结构调整)
parsed_data = {
"session_id": session_id,
"event_count": len(result['data'].get('results', [])),
"status": "success",
# 添加其他需要存储的字段
}
# 写入MySQL
with MySQLPool.get_connection() as conn:
with conn.cursor() as cursor:
sql = """INSERT INTO splunk_results
(session_id, event_count, status)
VALUES (%s, %s, %s)"""
cursor.execute(sql, (
parsed_data['session_id'],
parsed_data['event_count'],
parsed_data['status']
))
conn.commit()
logging.info(f"Session {session_id} processed successfully")
return True
except MySQLError as e:
logging.error(f"MySQL error on {session_id}: {str(e)}")
return False
except Exception as e:
logging.error(f"Processing error on {session_id}: {str(e)}")
return False
# 批量处理控制器
def batch_process(session_ids, max_workers=10):
success_count = 0
failure_count = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(process_session, sid): sid for sid in session_ids}
for future in as_completed(futures):
sid = futures[future]
try:
if future.result():
success_count += 1
else:
failure_count += 1
except Exception as e:
logging.error(f"Unexpected error processing {sid}: {str(e)}")
failure_count += 1
logging.info(f"Processing completed. Success: {success_count}, Failed: {failure_count}")
# 示例使用
if __name__ == "__main__":
# 从文件读取Session ID列表(示例)
with open("session_ids.txt") as f:
session_ids = [line.strip() for line in f if line.strip()]
# 启动批量处理(控制并发数)
batch_process(session_ids, max_workers=15)
关键功能说明:
-
多线程处理:
- 使用
ThreadPoolExecutor
管理线程池 - 可配置最大并发数(max_workers)
- 实现任务提交和结果跟踪
- 使用
-
MySQL连接管理:
- 实现简单的连接池模式
- 确保线程安全的数据库连接
- 自动管理连接获取和释放
-
异常处理:
- 网络请求异常处理
- 数据库操作异常处理
- 通用异常捕获和日志记录
-
日志记录:
- 同时输出到文件和终端
- 包含时间戳和错误详情
- 线程安全的日志记录
-
性能优化:
- 连接复用减少开销
- 异步任务处理
- 批量结果处理
使用说明:
- 安装依赖:
pip install requests pymysql
- 准备MySQL表:
CREATE TABLE splunk_results (
id INT AUTO_INCREMENT PRIMARY KEY,
session_id VARCHAR(255) NOT NULL,
event_count INT,
status VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
- 配置文件:
- 修改SPLUNK_API_URL和请求头
- 调整MySQL连接参数
- 根据实际数据结构修改数据解析逻辑
优化建议:
- 速率限制:
# 在fetch_splunk_data中添加延迟
import time
time.sleep(0.1) # 控制请求频率
- 重试机制:
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def fetch_splunk_data(session_id):
# 原有代码
- 批量写入优化:
# 在process_session中改为批量插入
batch_size = 100
batch = []
def save_batch():
with MySQLPool.get_connection() as conn:
with conn.cursor() as cursor:
sql = "INSERT ..."
cursor.executemany(sql, batch)
conn.commit()
# 在处理过程中积累数据
batch.append(data)
if len(batch) >= batch_size:
save_batch()
batch = []
- 异步IO版本:
考虑使用aiohttp和asyncpg实现完全异步的版本,可进一步提升性能。
监控建议:
- 通过日志文件监控处理进度
- 添加Prometheus指标监控
- 实现进度条显示(使用tqdm库)